diff --git a/Frontend/graphql/operations/chat-messages.graphql b/Frontend/graphql/operations/chat-messages.graphql
index 1b49e8a..7a7113c 100644
--- a/Frontend/graphql/operations/chat-messages.graphql
+++ b/Frontend/graphql/operations/chat-messages.graphql
@@ -13,6 +13,16 @@ query ChatMessagesQuery {
output
at
}
+ changeSetId
+ changeStatus
+ changeSummary
+ changeItems {
+ entity
+ action
+ title
+ before
+ after
+ }
createdAt
}
}
diff --git a/Frontend/graphql/operations/confirm-latest-change-set.graphql b/Frontend/graphql/operations/confirm-latest-change-set.graphql
new file mode 100644
index 0000000..903ba5f
--- /dev/null
+++ b/Frontend/graphql/operations/confirm-latest-change-set.graphql
@@ -0,0 +1,6 @@
+mutation ConfirmLatestChangeSetMutation {
+ confirmLatestChangeSet {
+ ok
+ }
+}
+
diff --git a/Frontend/graphql/operations/rollback-latest-change-set.graphql b/Frontend/graphql/operations/rollback-latest-change-set.graphql
new file mode 100644
index 0000000..9d2d645
--- /dev/null
+++ b/Frontend/graphql/operations/rollback-latest-change-set.graphql
@@ -0,0 +1,6 @@
+mutation RollbackLatestChangeSetMutation {
+ rollbackLatestChangeSet {
+ ok
+ }
+}
+
diff --git a/Frontend/server/agent/crmAgent.ts b/Frontend/server/agent/crmAgent.ts
index 399b285..8692e98 100644
--- a/Frontend/server/agent/crmAgent.ts
+++ b/Frontend/server/agent/crmAgent.ts
@@ -5,6 +5,7 @@ import { prisma } from "../utils/prisma";
import { datasetRoot } from "../dataset/paths";
import { ensureDataset } from "../dataset/exporter";
import { runLangGraphCrmAgentFor } from "./langgraphCrmAgent";
+import type { ChangeSet } from "../utils/changeSet";
type ContactIndexRow = {
id: string;
@@ -243,6 +244,7 @@ export async function persistChatMessage(input: {
output: string;
at: string;
}>;
+ changeSet?: ChangeSet | null;
teamId: string;
conversationId: string;
authorUserId?: string | null;
@@ -251,7 +253,8 @@ export async function persistChatMessage(input: {
(input.plan && input.plan.length) ||
(input.tools && input.tools.length) ||
(input.thinking && input.thinking.length) ||
- (input.toolRuns && input.toolRuns.length),
+ (input.toolRuns && input.toolRuns.length) ||
+ input.changeSet,
);
const data: Prisma.ChatMessageCreateInput = {
team: { connect: { id: input.teamId } },
@@ -265,6 +268,7 @@ export async function persistChatMessage(input: {
tools: input.tools ?? [],
thinking: input.thinking ?? input.plan ?? [],
toolRuns: input.toolRuns ?? [],
+ changeSet: input.changeSet ?? null,
} as any)
: undefined,
};
diff --git a/Frontend/server/graphql/schema.ts b/Frontend/server/graphql/schema.ts
index d56c277..a1bda97 100644
--- a/Frontend/server/graphql/schema.ts
+++ b/Frontend/server/graphql/schema.ts
@@ -6,6 +6,8 @@ import { prisma } from "../utils/prisma";
import { normalizePhone, verifyPassword } from "../utils/password";
import { persistChatMessage, runCrmAgentFor } from "../agent/crmAgent";
import type { AgentTraceEvent } from "../agent/crmAgent";
+import { buildChangeSet, captureSnapshot, rollbackChangeSet } from "../utils/changeSet";
+import type { ChangeSet } from "../utils/changeSet";
type GraphQLContext = {
auth: AuthContext | null;
@@ -218,6 +220,7 @@ async function getChatMessages(auth: AuthContext | null) {
return items.map((m) => {
const debug = (m.planJson as any) ?? {};
+ const cs = getChangeSetFromPlanJson(m.planJson);
return {
id: m.id,
role: m.role === "USER" ? "user" : m.role === "ASSISTANT" ? "assistant" : "system",
@@ -236,6 +239,18 @@ async function getChatMessages(auth: AuthContext | null) {
at: t.at ? String(t.at) : m.createdAt.toISOString(),
}))
: [],
+ changeSetId: cs?.id ?? null,
+ changeStatus: cs?.status ?? null,
+ changeSummary: cs?.summary ?? null,
+ changeItems: Array.isArray(cs?.items)
+ ? cs.items.map((item) => ({
+ entity: String(item.entity ?? ""),
+ action: String(item.action ?? ""),
+ title: String(item.title ?? ""),
+ before: String(item.before ?? ""),
+ after: String(item.after ?? ""),
+ }))
+ : [],
createdAt: m.createdAt.toISOString(),
};
});
@@ -498,11 +513,93 @@ async function updateFeedDecision(auth: AuthContext | null, id: string, decision
return { ok: true, id };
}
+function getChangeSetFromPlanJson(planJson: unknown): ChangeSet | null {
+ const debug = (planJson as any) ?? {};
+ const cs = debug?.changeSet;
+ if (!cs || typeof cs !== "object") return null;
+ if (!cs.id || !Array.isArray(cs.items) || !Array.isArray(cs.undo)) return null;
+ return cs as ChangeSet;
+}
+
+async function findLatestChangeCarrierMessage(auth: AuthContext | null) {
+ const ctx = requireAuth(auth);
+ const items = await prisma.chatMessage.findMany({
+ where: {
+ teamId: ctx.teamId,
+ conversationId: ctx.conversationId,
+ role: "ASSISTANT",
+ },
+ orderBy: { createdAt: "desc" },
+ take: 30,
+ });
+
+ for (const item of items) {
+ const changeSet = getChangeSetFromPlanJson(item.planJson);
+ if (!changeSet) continue;
+ if (changeSet.status === "rolled_back") continue;
+ return { item, changeSet };
+ }
+ return null;
+}
+
+async function confirmLatestChangeSet(auth: AuthContext | null) {
+ const found = await findLatestChangeCarrierMessage(auth);
+ if (!found) return { ok: true };
+
+ const { item, changeSet } = found;
+ if (changeSet.status === "confirmed") return { ok: true };
+
+ const next = {
+ ...((item.planJson as any) ?? {}),
+ changeSet: {
+ ...changeSet,
+ status: "confirmed",
+ confirmedAt: new Date().toISOString(),
+ },
+ };
+
+ await prisma.chatMessage.update({
+ where: { id: item.id },
+ data: { planJson: next as any },
+ });
+
+ return { ok: true };
+}
+
+async function rollbackLatestChangeSet(auth: AuthContext | null) {
+ const ctx = requireAuth(auth);
+ const found = await findLatestChangeCarrierMessage(ctx);
+ if (!found) return { ok: true };
+
+ const { item, changeSet } = found;
+ if (changeSet.status === "rolled_back") return { ok: true };
+
+ await rollbackChangeSet(prisma, ctx.teamId, changeSet);
+
+ const next = {
+ ...((item.planJson as any) ?? {}),
+ changeSet: {
+ ...changeSet,
+ status: "rolled_back",
+ rolledBackAt: new Date().toISOString(),
+ },
+ };
+
+ await prisma.chatMessage.update({
+ where: { id: item.id },
+ data: { planJson: next as any },
+ });
+
+ return { ok: true };
+}
+
async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
const ctx = requireAuth(auth);
const text = (textInput ?? "").trim();
if (!text) throw new Error("text is required");
+ const snapshotBefore = await captureSnapshot(prisma, ctx.teamId);
+
await persistChatMessage({
teamId: ctx.teamId,
conversationId: ctx.conversationId,
@@ -529,6 +626,10 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
});
},
});
+
+ const snapshotAfter = await captureSnapshot(prisma, ctx.teamId);
+ const changeSet = buildChangeSet(snapshotBefore, snapshotAfter);
+
await persistChatMessage({
teamId: ctx.teamId,
conversationId: ctx.conversationId,
@@ -539,6 +640,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
thinking: reply.thinking ?? [],
tools: reply.tools,
toolRuns: reply.toolRuns ?? [],
+ changeSet,
});
return { ok: true };
@@ -578,6 +680,8 @@ export const crmGraphqlSchema = buildSchema(`
createChatConversation(title: String): Conversation!
selectChatConversation(id: ID!): MutationResult!
sendPilotMessage(text: String!): MutationResult!
+ confirmLatestChangeSet: MutationResult!
+ rollbackLatestChangeSet: MutationResult!
logPilotNote(text: String!): MutationResult!
createCalendarEvent(input: CreateCalendarEventInput!): CalendarEvent!
createCommunication(input: CreateCommunicationInput!): MutationWithIdResult!
@@ -647,9 +751,21 @@ export const crmGraphqlSchema = buildSchema(`
thinking: [String!]!
tools: [String!]!
toolRuns: [PilotToolRun!]!
+ changeSetId: String
+ changeStatus: String
+ changeSummary: String
+ changeItems: [PilotChangeItem!]!
createdAt: String!
}
+ type PilotChangeItem {
+ entity: String!
+ action: String!
+ title: String!
+ before: String!
+ after: String!
+ }
+
type PilotToolRun {
name: String!
status: String!
@@ -770,6 +886,12 @@ export const crmGraphqlRoot = {
sendPilotMessage: async (args: { text: string }, context: GraphQLContext) =>
sendPilotMessage(context.auth, args.text),
+ confirmLatestChangeSet: async (_args: unknown, context: GraphQLContext) =>
+ confirmLatestChangeSet(context.auth),
+
+ rollbackLatestChangeSet: async (_args: unknown, context: GraphQLContext) =>
+ rollbackLatestChangeSet(context.auth),
+
logPilotNote: async (args: { text: string }, context: GraphQLContext) =>
logPilotNote(context.auth, args.text),
diff --git a/Frontend/server/utils/changeSet.ts b/Frontend/server/utils/changeSet.ts
new file mode 100644
index 0000000..10a0d66
--- /dev/null
+++ b/Frontend/server/utils/changeSet.ts
@@ -0,0 +1,415 @@
+import { randomUUID } from "node:crypto";
+import type { PrismaClient } from "@prisma/client";
+
+type CalendarSnapshotRow = {
+ id: string;
+ teamId: string;
+ contactId: string | null;
+ title: string;
+ startsAt: string;
+ endsAt: string | null;
+ note: string | null;
+ status: string | null;
+};
+
+type ContactNoteSnapshotRow = {
+ contactId: string;
+ contactName: string;
+ content: string;
+};
+
+type MessageSnapshotRow = {
+ id: string;
+ contactId: string;
+ contactName: string;
+ kind: string;
+ direction: string;
+ channel: string;
+ content: string;
+ durationSec: number | null;
+ occurredAt: string;
+};
+
+type DealSnapshotRow = {
+ id: string;
+ title: string;
+ contactName: string;
+ stage: string;
+ nextStep: string | null;
+ summary: string | null;
+};
+
+export type SnapshotState = {
+ calendarById: Map;
+ noteByContactId: Map;
+ messageById: Map;
+ dealById: Map;
+};
+
+export type ChangeItem = {
+ entity: "calendar_event" | "contact_note" | "message" | "deal";
+ action: "created" | "updated" | "deleted";
+ title: string;
+ before: string;
+ after: string;
+};
+
+type UndoOp =
+ | { kind: "delete_calendar_event"; id: string }
+ | { kind: "restore_calendar_event"; data: CalendarSnapshotRow }
+ | { kind: "delete_contact_message"; id: string }
+ | { kind: "restore_contact_message"; data: MessageSnapshotRow }
+ | { kind: "restore_contact_note"; contactId: string; content: string | null }
+ | { kind: "restore_deal"; id: string; stage: string; nextStep: string | null; summary: string | null };
+
+export type ChangeSet = {
+ id: string;
+ status: "pending" | "confirmed" | "rolled_back";
+ createdAt: string;
+ summary: string;
+ items: ChangeItem[];
+ undo: UndoOp[];
+};
+
+function fmt(val: string | null | undefined) {
+ return (val ?? "").trim();
+}
+
+function toCalendarText(row: CalendarSnapshotRow) {
+ const when = new Date(row.startsAt).toLocaleString("ru-RU");
+ return `${row.title} · ${when}${row.note ? ` · ${row.note}` : ""}`;
+}
+
+function toMessageText(row: MessageSnapshotRow) {
+ const when = new Date(row.occurredAt).toLocaleString("ru-RU");
+ return `${row.contactName} · ${row.channel} · ${row.kind.toLowerCase()} · ${when} · ${row.content}`;
+}
+
+function toDealText(row: DealSnapshotRow) {
+ return `${row.title} (${row.contactName}) · ${row.stage}${row.nextStep ? ` · next: ${row.nextStep}` : ""}`;
+}
+
+export async function captureSnapshot(prisma: PrismaClient, teamId: string): Promise {
+ const [calendar, notes, messages, deals] = await Promise.all([
+ prisma.calendarEvent.findMany({
+ where: { teamId },
+ select: {
+ id: true,
+ teamId: true,
+ contactId: true,
+ title: true,
+ startsAt: true,
+ endsAt: true,
+ note: true,
+ status: true,
+ },
+ take: 4000,
+ }),
+ prisma.contactNote.findMany({
+ where: { contact: { teamId } },
+ select: { contactId: true, content: true, contact: { select: { name: true } } },
+ take: 4000,
+ }),
+ prisma.contactMessage.findMany({
+ where: { contact: { teamId } },
+ include: { contact: { select: { name: true } } },
+ orderBy: { createdAt: "asc" },
+ take: 6000,
+ }),
+ prisma.deal.findMany({
+ where: { teamId },
+ include: { contact: { select: { name: true } } },
+ take: 4000,
+ }),
+ ]);
+
+ return {
+ calendarById: new Map(
+ calendar.map((row) => [
+ row.id,
+ {
+ id: row.id,
+ teamId: row.teamId,
+ contactId: row.contactId ?? null,
+ title: row.title,
+ startsAt: row.startsAt.toISOString(),
+ endsAt: row.endsAt?.toISOString() ?? null,
+ note: row.note ?? null,
+ status: row.status ?? null,
+ },
+ ]),
+ ),
+ noteByContactId: new Map(
+ notes.map((row) => [
+ row.contactId,
+ {
+ contactId: row.contactId,
+ contactName: row.contact.name,
+ content: row.content,
+ },
+ ]),
+ ),
+ messageById: new Map(
+ messages.map((row) => [
+ row.id,
+ {
+ id: row.id,
+ contactId: row.contactId,
+ contactName: row.contact.name,
+ kind: row.kind,
+ direction: row.direction,
+ channel: row.channel,
+ content: row.content,
+ durationSec: row.durationSec ?? null,
+ occurredAt: row.occurredAt.toISOString(),
+ },
+ ]),
+ ),
+ dealById: new Map(
+ deals.map((row) => [
+ row.id,
+ {
+ id: row.id,
+ title: row.title,
+ contactName: row.contact.name,
+ stage: row.stage,
+ nextStep: row.nextStep ?? null,
+ summary: row.summary ?? null,
+ },
+ ]),
+ ),
+ };
+}
+
+export function buildChangeSet(before: SnapshotState, after: SnapshotState): ChangeSet | null {
+ const items: ChangeItem[] = [];
+ const undo: UndoOp[] = [];
+
+ for (const [id, row] of after.calendarById) {
+ const prev = before.calendarById.get(id);
+ if (!prev) {
+ items.push({
+ entity: "calendar_event",
+ action: "created",
+ title: `Event created: ${row.title}`,
+ before: "",
+ after: toCalendarText(row),
+ });
+ undo.push({ kind: "delete_calendar_event", id });
+ continue;
+ }
+ if (
+ prev.title !== row.title ||
+ prev.startsAt !== row.startsAt ||
+ prev.endsAt !== row.endsAt ||
+ fmt(prev.note) !== fmt(row.note) ||
+ fmt(prev.status) !== fmt(row.status) ||
+ prev.contactId !== row.contactId
+ ) {
+ items.push({
+ entity: "calendar_event",
+ action: "updated",
+ title: `Event updated: ${row.title}`,
+ before: toCalendarText(prev),
+ after: toCalendarText(row),
+ });
+ undo.push({ kind: "restore_calendar_event", data: prev });
+ }
+ }
+
+ for (const [id, row] of before.calendarById) {
+ if (after.calendarById.has(id)) continue;
+ items.push({
+ entity: "calendar_event",
+ action: "deleted",
+ title: `Event archived: ${row.title}`,
+ before: toCalendarText(row),
+ after: "",
+ });
+ undo.push({ kind: "restore_calendar_event", data: row });
+ }
+
+ for (const [contactId, row] of after.noteByContactId) {
+ const prev = before.noteByContactId.get(contactId);
+ if (!prev) {
+ items.push({
+ entity: "contact_note",
+ action: "created",
+ title: `Summary added: ${row.contactName}`,
+ before: "",
+ after: row.content,
+ });
+ undo.push({ kind: "restore_contact_note", contactId, content: null });
+ continue;
+ }
+ if (prev.content !== row.content) {
+ items.push({
+ entity: "contact_note",
+ action: "updated",
+ title: `Summary updated: ${row.contactName}`,
+ before: prev.content,
+ after: row.content,
+ });
+ undo.push({ kind: "restore_contact_note", contactId, content: prev.content });
+ }
+ }
+
+ for (const [contactId, row] of before.noteByContactId) {
+ if (after.noteByContactId.has(contactId)) continue;
+ items.push({
+ entity: "contact_note",
+ action: "deleted",
+ title: `Summary cleared: ${row.contactName}`,
+ before: row.content,
+ after: "",
+ });
+ undo.push({ kind: "restore_contact_note", contactId, content: row.content });
+ }
+
+ for (const [id, row] of after.messageById) {
+ if (before.messageById.has(id)) continue;
+ items.push({
+ entity: "message",
+ action: "created",
+ title: `Message created: ${row.contactName}`,
+ before: "",
+ after: toMessageText(row),
+ });
+ undo.push({ kind: "delete_contact_message", id });
+ }
+
+ for (const [id, row] of after.dealById) {
+ const prev = before.dealById.get(id);
+ if (!prev) continue;
+ if (prev.stage !== row.stage || fmt(prev.nextStep) !== fmt(row.nextStep) || fmt(prev.summary) !== fmt(row.summary)) {
+ items.push({
+ entity: "deal",
+ action: "updated",
+ title: `Deal updated: ${row.title}`,
+ before: toDealText(prev),
+ after: toDealText(row),
+ });
+ undo.push({
+ kind: "restore_deal",
+ id,
+ stage: prev.stage,
+ nextStep: prev.nextStep,
+ summary: prev.summary,
+ });
+ }
+ }
+
+ if (items.length === 0) return null;
+
+ const created = items.filter((x) => x.action === "created").length;
+ const updated = items.filter((x) => x.action === "updated").length;
+ const deleted = items.filter((x) => x.action === "deleted").length;
+
+ return {
+ id: randomUUID(),
+ status: "pending",
+ createdAt: new Date().toISOString(),
+ summary: `Created: ${created}, Updated: ${updated}, Archived: ${deleted}`,
+ items,
+ undo,
+ };
+}
+
+export async function rollbackChangeSet(prisma: PrismaClient, teamId: string, changeSet: ChangeSet) {
+ const ops = [...changeSet.undo].reverse();
+
+ await prisma.$transaction(async (tx) => {
+ for (const op of ops) {
+ if (op.kind === "delete_calendar_event") {
+ await tx.calendarEvent.deleteMany({ where: { id: op.id, teamId } });
+ continue;
+ }
+
+ if (op.kind === "restore_calendar_event") {
+ const row = op.data;
+ await tx.calendarEvent.upsert({
+ where: { id: row.id },
+ update: {
+ teamId: row.teamId,
+ contactId: row.contactId,
+ title: row.title,
+ startsAt: new Date(row.startsAt),
+ endsAt: row.endsAt ? new Date(row.endsAt) : null,
+ note: row.note,
+ status: row.status,
+ },
+ create: {
+ id: row.id,
+ teamId: row.teamId,
+ contactId: row.contactId,
+ title: row.title,
+ startsAt: new Date(row.startsAt),
+ endsAt: row.endsAt ? new Date(row.endsAt) : null,
+ note: row.note,
+ status: row.status,
+ },
+ });
+ continue;
+ }
+
+ if (op.kind === "delete_contact_message") {
+ await tx.contactMessage.deleteMany({ where: { id: op.id } });
+ continue;
+ }
+
+ if (op.kind === "restore_contact_message") {
+ const row = op.data;
+ await tx.contactMessage.upsert({
+ where: { id: row.id },
+ update: {
+ contactId: row.contactId,
+ kind: row.kind as any,
+ direction: row.direction as any,
+ channel: row.channel as any,
+ content: row.content,
+ durationSec: row.durationSec,
+ occurredAt: new Date(row.occurredAt),
+ },
+ create: {
+ id: row.id,
+ contactId: row.contactId,
+ kind: row.kind as any,
+ direction: row.direction as any,
+ channel: row.channel as any,
+ content: row.content,
+ durationSec: row.durationSec,
+ occurredAt: new Date(row.occurredAt),
+ },
+ });
+ continue;
+ }
+
+ if (op.kind === "restore_contact_note") {
+ const contact = await tx.contact.findFirst({ where: { id: op.contactId, teamId }, select: { id: true } });
+ if (!contact) continue;
+ if (op.content === null) {
+ await tx.contactNote.deleteMany({ where: { contactId: op.contactId } });
+ } else {
+ await tx.contactNote.upsert({
+ where: { contactId: op.contactId },
+ update: { content: op.content },
+ create: { contactId: op.contactId, content: op.content },
+ });
+ }
+ continue;
+ }
+
+ if (op.kind === "restore_deal") {
+ await tx.deal.updateMany({
+ where: { id: op.id, teamId },
+ data: {
+ stage: op.stage,
+ nextStep: op.nextStep,
+ summary: op.summary,
+ },
+ });
+ }
+ }
+ });
+}
+