Add chat-side CRM diff panel with keep/rollback flow

This commit is contained in:
Ruslan Bakiev
2026-02-19 05:22:16 +07:00
parent a09acc62a0
commit d9c994c408
7 changed files with 709 additions and 7 deletions

View File

@@ -5,6 +5,7 @@ import { prisma } from "../utils/prisma";
import { datasetRoot } from "../dataset/paths";
import { ensureDataset } from "../dataset/exporter";
import { runLangGraphCrmAgentFor } from "./langgraphCrmAgent";
import type { ChangeSet } from "../utils/changeSet";
type ContactIndexRow = {
id: string;
@@ -243,6 +244,7 @@ export async function persistChatMessage(input: {
output: string;
at: string;
}>;
changeSet?: ChangeSet | null;
teamId: string;
conversationId: string;
authorUserId?: string | null;
@@ -251,7 +253,8 @@ export async function persistChatMessage(input: {
(input.plan && input.plan.length) ||
(input.tools && input.tools.length) ||
(input.thinking && input.thinking.length) ||
(input.toolRuns && input.toolRuns.length),
(input.toolRuns && input.toolRuns.length) ||
input.changeSet,
);
const data: Prisma.ChatMessageCreateInput = {
team: { connect: { id: input.teamId } },
@@ -265,6 +268,7 @@ export async function persistChatMessage(input: {
tools: input.tools ?? [],
thinking: input.thinking ?? input.plan ?? [],
toolRuns: input.toolRuns ?? [],
changeSet: input.changeSet ?? null,
} as any)
: undefined,
};

View File

@@ -6,6 +6,8 @@ import { prisma } from "../utils/prisma";
import { normalizePhone, verifyPassword } from "../utils/password";
import { persistChatMessage, runCrmAgentFor } from "../agent/crmAgent";
import type { AgentTraceEvent } from "../agent/crmAgent";
import { buildChangeSet, captureSnapshot, rollbackChangeSet } from "../utils/changeSet";
import type { ChangeSet } from "../utils/changeSet";
type GraphQLContext = {
auth: AuthContext | null;
@@ -218,6 +220,7 @@ async function getChatMessages(auth: AuthContext | null) {
return items.map((m) => {
const debug = (m.planJson as any) ?? {};
const cs = getChangeSetFromPlanJson(m.planJson);
return {
id: m.id,
role: m.role === "USER" ? "user" : m.role === "ASSISTANT" ? "assistant" : "system",
@@ -236,6 +239,18 @@ async function getChatMessages(auth: AuthContext | null) {
at: t.at ? String(t.at) : m.createdAt.toISOString(),
}))
: [],
changeSetId: cs?.id ?? null,
changeStatus: cs?.status ?? null,
changeSummary: cs?.summary ?? null,
changeItems: Array.isArray(cs?.items)
? cs.items.map((item) => ({
entity: String(item.entity ?? ""),
action: String(item.action ?? ""),
title: String(item.title ?? ""),
before: String(item.before ?? ""),
after: String(item.after ?? ""),
}))
: [],
createdAt: m.createdAt.toISOString(),
};
});
@@ -498,11 +513,93 @@ async function updateFeedDecision(auth: AuthContext | null, id: string, decision
return { ok: true, id };
}
function getChangeSetFromPlanJson(planJson: unknown): ChangeSet | null {
const debug = (planJson as any) ?? {};
const cs = debug?.changeSet;
if (!cs || typeof cs !== "object") return null;
if (!cs.id || !Array.isArray(cs.items) || !Array.isArray(cs.undo)) return null;
return cs as ChangeSet;
}
async function findLatestChangeCarrierMessage(auth: AuthContext | null) {
const ctx = requireAuth(auth);
const items = await prisma.chatMessage.findMany({
where: {
teamId: ctx.teamId,
conversationId: ctx.conversationId,
role: "ASSISTANT",
},
orderBy: { createdAt: "desc" },
take: 30,
});
for (const item of items) {
const changeSet = getChangeSetFromPlanJson(item.planJson);
if (!changeSet) continue;
if (changeSet.status === "rolled_back") continue;
return { item, changeSet };
}
return null;
}
async function confirmLatestChangeSet(auth: AuthContext | null) {
const found = await findLatestChangeCarrierMessage(auth);
if (!found) return { ok: true };
const { item, changeSet } = found;
if (changeSet.status === "confirmed") return { ok: true };
const next = {
...((item.planJson as any) ?? {}),
changeSet: {
...changeSet,
status: "confirmed",
confirmedAt: new Date().toISOString(),
},
};
await prisma.chatMessage.update({
where: { id: item.id },
data: { planJson: next as any },
});
return { ok: true };
}
async function rollbackLatestChangeSet(auth: AuthContext | null) {
const ctx = requireAuth(auth);
const found = await findLatestChangeCarrierMessage(ctx);
if (!found) return { ok: true };
const { item, changeSet } = found;
if (changeSet.status === "rolled_back") return { ok: true };
await rollbackChangeSet(prisma, ctx.teamId, changeSet);
const next = {
...((item.planJson as any) ?? {}),
changeSet: {
...changeSet,
status: "rolled_back",
rolledBackAt: new Date().toISOString(),
},
};
await prisma.chatMessage.update({
where: { id: item.id },
data: { planJson: next as any },
});
return { ok: true };
}
async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
const ctx = requireAuth(auth);
const text = (textInput ?? "").trim();
if (!text) throw new Error("text is required");
const snapshotBefore = await captureSnapshot(prisma, ctx.teamId);
await persistChatMessage({
teamId: ctx.teamId,
conversationId: ctx.conversationId,
@@ -529,6 +626,10 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
});
},
});
const snapshotAfter = await captureSnapshot(prisma, ctx.teamId);
const changeSet = buildChangeSet(snapshotBefore, snapshotAfter);
await persistChatMessage({
teamId: ctx.teamId,
conversationId: ctx.conversationId,
@@ -539,6 +640,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
thinking: reply.thinking ?? [],
tools: reply.tools,
toolRuns: reply.toolRuns ?? [],
changeSet,
});
return { ok: true };
@@ -578,6 +680,8 @@ export const crmGraphqlSchema = buildSchema(`
createChatConversation(title: String): Conversation!
selectChatConversation(id: ID!): MutationResult!
sendPilotMessage(text: String!): MutationResult!
confirmLatestChangeSet: MutationResult!
rollbackLatestChangeSet: MutationResult!
logPilotNote(text: String!): MutationResult!
createCalendarEvent(input: CreateCalendarEventInput!): CalendarEvent!
createCommunication(input: CreateCommunicationInput!): MutationWithIdResult!
@@ -647,9 +751,21 @@ export const crmGraphqlSchema = buildSchema(`
thinking: [String!]!
tools: [String!]!
toolRuns: [PilotToolRun!]!
changeSetId: String
changeStatus: String
changeSummary: String
changeItems: [PilotChangeItem!]!
createdAt: String!
}
type PilotChangeItem {
entity: String!
action: String!
title: String!
before: String!
after: String!
}
type PilotToolRun {
name: String!
status: String!
@@ -770,6 +886,12 @@ export const crmGraphqlRoot = {
sendPilotMessage: async (args: { text: string }, context: GraphQLContext) =>
sendPilotMessage(context.auth, args.text),
confirmLatestChangeSet: async (_args: unknown, context: GraphQLContext) =>
confirmLatestChangeSet(context.auth),
rollbackLatestChangeSet: async (_args: unknown, context: GraphQLContext) =>
rollbackLatestChangeSet(context.auth),
logPilotNote: async (args: { text: string }, context: GraphQLContext) =>
logPilotNote(context.auth, args.text),

View File

@@ -0,0 +1,415 @@
import { randomUUID } from "node:crypto";
import type { PrismaClient } from "@prisma/client";
type CalendarSnapshotRow = {
id: string;
teamId: string;
contactId: string | null;
title: string;
startsAt: string;
endsAt: string | null;
note: string | null;
status: string | null;
};
type ContactNoteSnapshotRow = {
contactId: string;
contactName: string;
content: string;
};
type MessageSnapshotRow = {
id: string;
contactId: string;
contactName: string;
kind: string;
direction: string;
channel: string;
content: string;
durationSec: number | null;
occurredAt: string;
};
type DealSnapshotRow = {
id: string;
title: string;
contactName: string;
stage: string;
nextStep: string | null;
summary: string | null;
};
export type SnapshotState = {
calendarById: Map<string, CalendarSnapshotRow>;
noteByContactId: Map<string, ContactNoteSnapshotRow>;
messageById: Map<string, MessageSnapshotRow>;
dealById: Map<string, DealSnapshotRow>;
};
export type ChangeItem = {
entity: "calendar_event" | "contact_note" | "message" | "deal";
action: "created" | "updated" | "deleted";
title: string;
before: string;
after: string;
};
type UndoOp =
| { kind: "delete_calendar_event"; id: string }
| { kind: "restore_calendar_event"; data: CalendarSnapshotRow }
| { kind: "delete_contact_message"; id: string }
| { kind: "restore_contact_message"; data: MessageSnapshotRow }
| { kind: "restore_contact_note"; contactId: string; content: string | null }
| { kind: "restore_deal"; id: string; stage: string; nextStep: string | null; summary: string | null };
export type ChangeSet = {
id: string;
status: "pending" | "confirmed" | "rolled_back";
createdAt: string;
summary: string;
items: ChangeItem[];
undo: UndoOp[];
};
function fmt(val: string | null | undefined) {
return (val ?? "").trim();
}
function toCalendarText(row: CalendarSnapshotRow) {
const when = new Date(row.startsAt).toLocaleString("ru-RU");
return `${row.title} · ${when}${row.note ? ` · ${row.note}` : ""}`;
}
function toMessageText(row: MessageSnapshotRow) {
const when = new Date(row.occurredAt).toLocaleString("ru-RU");
return `${row.contactName} · ${row.channel} · ${row.kind.toLowerCase()} · ${when} · ${row.content}`;
}
function toDealText(row: DealSnapshotRow) {
return `${row.title} (${row.contactName}) · ${row.stage}${row.nextStep ? ` · next: ${row.nextStep}` : ""}`;
}
export async function captureSnapshot(prisma: PrismaClient, teamId: string): Promise<SnapshotState> {
const [calendar, notes, messages, deals] = await Promise.all([
prisma.calendarEvent.findMany({
where: { teamId },
select: {
id: true,
teamId: true,
contactId: true,
title: true,
startsAt: true,
endsAt: true,
note: true,
status: true,
},
take: 4000,
}),
prisma.contactNote.findMany({
where: { contact: { teamId } },
select: { contactId: true, content: true, contact: { select: { name: true } } },
take: 4000,
}),
prisma.contactMessage.findMany({
where: { contact: { teamId } },
include: { contact: { select: { name: true } } },
orderBy: { createdAt: "asc" },
take: 6000,
}),
prisma.deal.findMany({
where: { teamId },
include: { contact: { select: { name: true } } },
take: 4000,
}),
]);
return {
calendarById: new Map(
calendar.map((row) => [
row.id,
{
id: row.id,
teamId: row.teamId,
contactId: row.contactId ?? null,
title: row.title,
startsAt: row.startsAt.toISOString(),
endsAt: row.endsAt?.toISOString() ?? null,
note: row.note ?? null,
status: row.status ?? null,
},
]),
),
noteByContactId: new Map(
notes.map((row) => [
row.contactId,
{
contactId: row.contactId,
contactName: row.contact.name,
content: row.content,
},
]),
),
messageById: new Map(
messages.map((row) => [
row.id,
{
id: row.id,
contactId: row.contactId,
contactName: row.contact.name,
kind: row.kind,
direction: row.direction,
channel: row.channel,
content: row.content,
durationSec: row.durationSec ?? null,
occurredAt: row.occurredAt.toISOString(),
},
]),
),
dealById: new Map(
deals.map((row) => [
row.id,
{
id: row.id,
title: row.title,
contactName: row.contact.name,
stage: row.stage,
nextStep: row.nextStep ?? null,
summary: row.summary ?? null,
},
]),
),
};
}
export function buildChangeSet(before: SnapshotState, after: SnapshotState): ChangeSet | null {
const items: ChangeItem[] = [];
const undo: UndoOp[] = [];
for (const [id, row] of after.calendarById) {
const prev = before.calendarById.get(id);
if (!prev) {
items.push({
entity: "calendar_event",
action: "created",
title: `Event created: ${row.title}`,
before: "",
after: toCalendarText(row),
});
undo.push({ kind: "delete_calendar_event", id });
continue;
}
if (
prev.title !== row.title ||
prev.startsAt !== row.startsAt ||
prev.endsAt !== row.endsAt ||
fmt(prev.note) !== fmt(row.note) ||
fmt(prev.status) !== fmt(row.status) ||
prev.contactId !== row.contactId
) {
items.push({
entity: "calendar_event",
action: "updated",
title: `Event updated: ${row.title}`,
before: toCalendarText(prev),
after: toCalendarText(row),
});
undo.push({ kind: "restore_calendar_event", data: prev });
}
}
for (const [id, row] of before.calendarById) {
if (after.calendarById.has(id)) continue;
items.push({
entity: "calendar_event",
action: "deleted",
title: `Event archived: ${row.title}`,
before: toCalendarText(row),
after: "",
});
undo.push({ kind: "restore_calendar_event", data: row });
}
for (const [contactId, row] of after.noteByContactId) {
const prev = before.noteByContactId.get(contactId);
if (!prev) {
items.push({
entity: "contact_note",
action: "created",
title: `Summary added: ${row.contactName}`,
before: "",
after: row.content,
});
undo.push({ kind: "restore_contact_note", contactId, content: null });
continue;
}
if (prev.content !== row.content) {
items.push({
entity: "contact_note",
action: "updated",
title: `Summary updated: ${row.contactName}`,
before: prev.content,
after: row.content,
});
undo.push({ kind: "restore_contact_note", contactId, content: prev.content });
}
}
for (const [contactId, row] of before.noteByContactId) {
if (after.noteByContactId.has(contactId)) continue;
items.push({
entity: "contact_note",
action: "deleted",
title: `Summary cleared: ${row.contactName}`,
before: row.content,
after: "",
});
undo.push({ kind: "restore_contact_note", contactId, content: row.content });
}
for (const [id, row] of after.messageById) {
if (before.messageById.has(id)) continue;
items.push({
entity: "message",
action: "created",
title: `Message created: ${row.contactName}`,
before: "",
after: toMessageText(row),
});
undo.push({ kind: "delete_contact_message", id });
}
for (const [id, row] of after.dealById) {
const prev = before.dealById.get(id);
if (!prev) continue;
if (prev.stage !== row.stage || fmt(prev.nextStep) !== fmt(row.nextStep) || fmt(prev.summary) !== fmt(row.summary)) {
items.push({
entity: "deal",
action: "updated",
title: `Deal updated: ${row.title}`,
before: toDealText(prev),
after: toDealText(row),
});
undo.push({
kind: "restore_deal",
id,
stage: prev.stage,
nextStep: prev.nextStep,
summary: prev.summary,
});
}
}
if (items.length === 0) return null;
const created = items.filter((x) => x.action === "created").length;
const updated = items.filter((x) => x.action === "updated").length;
const deleted = items.filter((x) => x.action === "deleted").length;
return {
id: randomUUID(),
status: "pending",
createdAt: new Date().toISOString(),
summary: `Created: ${created}, Updated: ${updated}, Archived: ${deleted}`,
items,
undo,
};
}
export async function rollbackChangeSet(prisma: PrismaClient, teamId: string, changeSet: ChangeSet) {
const ops = [...changeSet.undo].reverse();
await prisma.$transaction(async (tx) => {
for (const op of ops) {
if (op.kind === "delete_calendar_event") {
await tx.calendarEvent.deleteMany({ where: { id: op.id, teamId } });
continue;
}
if (op.kind === "restore_calendar_event") {
const row = op.data;
await tx.calendarEvent.upsert({
where: { id: row.id },
update: {
teamId: row.teamId,
contactId: row.contactId,
title: row.title,
startsAt: new Date(row.startsAt),
endsAt: row.endsAt ? new Date(row.endsAt) : null,
note: row.note,
status: row.status,
},
create: {
id: row.id,
teamId: row.teamId,
contactId: row.contactId,
title: row.title,
startsAt: new Date(row.startsAt),
endsAt: row.endsAt ? new Date(row.endsAt) : null,
note: row.note,
status: row.status,
},
});
continue;
}
if (op.kind === "delete_contact_message") {
await tx.contactMessage.deleteMany({ where: { id: op.id } });
continue;
}
if (op.kind === "restore_contact_message") {
const row = op.data;
await tx.contactMessage.upsert({
where: { id: row.id },
update: {
contactId: row.contactId,
kind: row.kind as any,
direction: row.direction as any,
channel: row.channel as any,
content: row.content,
durationSec: row.durationSec,
occurredAt: new Date(row.occurredAt),
},
create: {
id: row.id,
contactId: row.contactId,
kind: row.kind as any,
direction: row.direction as any,
channel: row.channel as any,
content: row.content,
durationSec: row.durationSec,
occurredAt: new Date(row.occurredAt),
},
});
continue;
}
if (op.kind === "restore_contact_note") {
const contact = await tx.contact.findFirst({ where: { id: op.contactId, teamId }, select: { id: true } });
if (!contact) continue;
if (op.content === null) {
await tx.contactNote.deleteMany({ where: { contactId: op.contactId } });
} else {
await tx.contactNote.upsert({
where: { contactId: op.contactId },
update: { content: op.content },
create: { contactId: op.contactId, content: op.content },
});
}
continue;
}
if (op.kind === "restore_deal") {
await tx.deal.updateMany({
where: { id: op.id, teamId },
data: {
stage: op.stage,
nextStep: op.nextStep,
summary: op.summary,
},
});
}
}
});
}