refactor ai naming and make omni raw-json first

This commit is contained in:
Ruslan Bakiev
2026-02-23 09:32:59 +07:00
parent ab5370c831
commit 43b487ccec
13 changed files with 226 additions and 79 deletions

View File

@@ -68,8 +68,8 @@ model Team {
contacts Contact[] contacts Contact[]
calendarEvents CalendarEvent[] calendarEvents CalendarEvent[]
deals Deal[] deals Deal[]
conversations ChatConversation[] aiConversations AiConversation[]
chatMessages ChatMessage[] aiMessages AiMessage[]
omniThreads OmniThread[] omniThreads OmniThread[]
omniMessages OmniMessage[] omniMessages OmniMessage[]
@@ -90,9 +90,9 @@ model User {
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
memberships TeamMember[] memberships TeamMember[]
conversations ChatConversation[] @relation("ConversationCreator") aiConversations AiConversation[] @relation("ConversationCreator")
chatMessages ChatMessage[] @relation("ChatAuthor") aiMessages AiMessage[] @relation("ChatAuthor")
} }
model TeamMember { model TeamMember {
@@ -305,7 +305,7 @@ model DealStep {
@@index([status, dueAt]) @@index([status, dueAt])
} }
model ChatConversation { model AiConversation {
id String @id @default(cuid()) id String @id @default(cuid())
teamId String teamId String
createdByUserId String createdByUserId String
@@ -313,15 +313,16 @@ model ChatConversation {
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade)
messages ChatMessage[] messages AiMessage[]
@@index([teamId, updatedAt]) @@index([teamId, updatedAt])
@@index([createdByUserId]) @@index([createdByUserId])
@@map("ChatConversation")
} }
model ChatMessage { model AiMessage {
id String @id @default(cuid()) id String @id @default(cuid())
teamId String teamId String
conversationId String conversationId String
@@ -331,13 +332,14 @@ model ChatMessage {
planJson Json? planJson Json?
createdAt DateTime @default(now()) createdAt DateTime @default(now())
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
conversation ChatConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade)
authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull)
@@index([createdAt]) @@index([createdAt])
@@index([teamId, createdAt]) @@index([teamId, createdAt])
@@index([conversationId, createdAt]) @@index([conversationId, createdAt])
@@map("ChatMessage")
} }
model FeedCard { model FeedCard {

View File

@@ -123,7 +123,7 @@ async function main() {
create: { teamId: team.id, userId: user.id, role: "OWNER" }, create: { teamId: team.id, userId: user.id, role: "OWNER" },
}); });
const conversation = await prisma.chatConversation.upsert({ const conversation = await prisma.aiConversation.upsert({
where: { id: `pilot-${team.id}` }, where: { id: `pilot-${team.id}` },
update: { title: "Пилот" }, update: { title: "Пилот" },
create: { id: `pilot-${team.id}`, teamId: team.id, createdByUserId: user.id, title: "Пилот" }, create: { id: `pilot-${team.id}`, teamId: team.id, createdByUserId: user.id, title: "Пилот" },
@@ -136,7 +136,7 @@ async function main() {
prisma.deal.deleteMany({ where: { teamId: team.id } }), prisma.deal.deleteMany({ where: { teamId: team.id } }),
prisma.calendarEvent.deleteMany({ where: { teamId: team.id } }), prisma.calendarEvent.deleteMany({ where: { teamId: team.id } }),
prisma.contactMessage.deleteMany({ where: { contact: { teamId: team.id } } }), prisma.contactMessage.deleteMany({ where: { contact: { teamId: team.id } } }),
prisma.chatMessage.deleteMany({ where: { teamId: team.id, conversationId: conversation.id } }), prisma.aiMessage.deleteMany({ where: { teamId: team.id, conversationId: conversation.id } }),
prisma.omniMessage.deleteMany({ where: { teamId: team.id } }), prisma.omniMessage.deleteMany({ where: { teamId: team.id } }),
prisma.omniThread.deleteMany({ where: { teamId: team.id } }), prisma.omniThread.deleteMany({ where: { teamId: team.id } }),
prisma.omniContactIdentity.deleteMany({ where: { teamId: team.id } }), prisma.omniContactIdentity.deleteMany({ where: { teamId: team.id } }),

View File

@@ -255,7 +255,7 @@ export async function runCrmAgentFor(
); );
} }
export async function persistChatMessage(input: { export async function persistAiMessage(input: {
role: ChatRole; role: ChatRole;
text: string; text: string;
plan?: string[]; plan?: string[];
@@ -279,7 +279,7 @@ export async function persistChatMessage(input: {
authorUserId?: string | null; authorUserId?: string | null;
}) { }) {
const hasStoredPayload = Boolean(input.changeSet || input.messageKind); const hasStoredPayload = Boolean(input.changeSet || input.messageKind);
const data: Prisma.ChatMessageCreateInput = { const data: Prisma.AiMessageCreateInput = {
team: { connect: { id: input.teamId } }, team: { connect: { id: input.teamId } },
conversation: { connect: { id: input.conversationId } }, conversation: { connect: { id: input.conversationId } },
authorUser: input.authorUserId ? { connect: { id: input.authorUserId } } : undefined, authorUser: input.authorUserId ? { connect: { id: input.authorUserId } } : undefined,
@@ -292,5 +292,5 @@ export async function persistChatMessage(input: {
} as any) } as any)
: undefined, : undefined,
}; };
return prisma.chatMessage.create({ data }); return prisma.aiMessage.create({ data });
} }

View File

@@ -3,7 +3,7 @@ import { createUIMessageStream, createUIMessageStreamResponse } from "ai";
import { getAuthContext } from "../utils/auth"; import { getAuthContext } from "../utils/auth";
import { prisma } from "../utils/prisma"; import { prisma } from "../utils/prisma";
import { buildChangeSet, captureSnapshot } from "../utils/changeSet"; import { buildChangeSet, captureSnapshot } from "../utils/changeSet";
import { persistChatMessage, runCrmAgentFor, type AgentTraceEvent } from "../agent/crmAgent"; import { persistAiMessage, runCrmAgentFor, type AgentTraceEvent } from "../agent/crmAgent";
import type { PilotContextPayload } from "../agent/crmAgent"; import type { PilotContextPayload } from "../agent/crmAgent";
import type { ChangeSet } from "../utils/changeSet"; import type { ChangeSet } from "../utils/changeSet";
@@ -143,7 +143,7 @@ export default defineEventHandler(async (event) => {
try { try {
const snapshotBefore = await captureSnapshot(prisma, auth.teamId); const snapshotBefore = await captureSnapshot(prisma, auth.teamId);
await persistChatMessage({ await persistAiMessage({
teamId: auth.teamId, teamId: auth.teamId,
conversationId: auth.conversationId, conversationId: auth.conversationId,
authorUserId: auth.userId, authorUserId: auth.userId,
@@ -177,7 +177,7 @@ export default defineEventHandler(async (event) => {
const snapshotAfter = await captureSnapshot(prisma, auth.teamId); const snapshotAfter = await captureSnapshot(prisma, auth.teamId);
const changeSet = buildChangeSet(snapshotBefore, snapshotAfter); const changeSet = buildChangeSet(snapshotBefore, snapshotAfter);
await persistChatMessage({ await persistAiMessage({
teamId: auth.teamId, teamId: auth.teamId,
conversationId: auth.conversationId, conversationId: auth.conversationId,
authorUserId: null, authorUserId: null,
@@ -190,7 +190,7 @@ export default defineEventHandler(async (event) => {
}); });
if (changeSet) { if (changeSet) {
await persistChatMessage({ await persistAiMessage({
teamId: auth.teamId, teamId: auth.teamId,
conversationId: auth.conversationId, conversationId: auth.conversationId,
authorUserId: null, authorUserId: null,
@@ -212,7 +212,7 @@ export default defineEventHandler(async (event) => {
} catch (error: any) { } catch (error: any) {
const errorText = String(error?.message ?? error); const errorText = String(error?.message ?? error);
await persistChatMessage({ await persistAiMessage({
teamId: auth.teamId, teamId: auth.teamId,
conversationId: auth.conversationId, conversationId: auth.conversationId,
authorUserId: null, authorUserId: null,

View File

@@ -4,7 +4,7 @@ import type { AuthContext } from "../utils/auth";
import { clearAuthSession, setSession } from "../utils/auth"; import { clearAuthSession, setSession } from "../utils/auth";
import { prisma } from "../utils/prisma"; import { prisma } from "../utils/prisma";
import { normalizePhone, verifyPassword } from "../utils/password"; import { normalizePhone, verifyPassword } from "../utils/password";
import { persistChatMessage, runCrmAgentFor } from "../agent/crmAgent"; import { persistAiMessage, runCrmAgentFor } from "../agent/crmAgent";
import { buildChangeSet, captureSnapshot, rollbackChangeSet, rollbackChangeSetItems } from "../utils/changeSet"; import { buildChangeSet, captureSnapshot, rollbackChangeSet, rollbackChangeSetItems } from "../utils/changeSet";
import type { ChangeSet } from "../utils/changeSet"; import type { ChangeSet } from "../utils/changeSet";
import { enqueueTelegramSend } from "../queues/telegramSend"; import { enqueueTelegramSend } from "../queues/telegramSend";
@@ -38,6 +38,30 @@ function toDbChannel(channel: string) {
return "PHONE"; return "PHONE";
} }
function asObject(value: unknown): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) return {};
return value as Record<string, unknown>;
}
function readNestedString(obj: Record<string, unknown>, path: string[]): string {
let current: unknown = obj;
for (const segment of path) {
if (!current || typeof current !== "object" || Array.isArray(current)) return "";
current = (current as Record<string, unknown>)[segment];
}
return typeof current === "string" ? current.trim() : "";
}
function extractOmniNormalizedText(rawJson: unknown, fallbackText = "") {
const raw = asObject(rawJson);
return (
readNestedString(raw, ["normalized", "text"]) ||
readNestedString(raw, ["payloadNormalized", "text"]) ||
readNestedString(raw, ["deliveryRequest", "payload", "text"]) ||
String(fallbackText ?? "").trim()
);
}
async function loginWithPassword(event: H3Event, phoneInput: string, passwordInput: string) { async function loginWithPassword(event: H3Event, phoneInput: string, passwordInput: string) {
const phone = normalizePhone(phoneInput); const phone = normalizePhone(phoneInput);
const password = (passwordInput ?? "").trim(); const password = (passwordInput ?? "").trim();
@@ -69,11 +93,11 @@ async function loginWithPassword(event: H3Event, phoneInput: string, passwordInp
} }
const conversation = const conversation =
(await prisma.chatConversation.findFirst({ (await prisma.aiConversation.findFirst({
where: { teamId: membership.teamId, createdByUserId: user.id }, where: { teamId: membership.teamId, createdByUserId: user.id },
orderBy: { createdAt: "desc" }, orderBy: { createdAt: "desc" },
})) || })) ||
(await prisma.chatConversation.create({ (await prisma.aiConversation.create({
data: { teamId: membership.teamId, createdByUserId: user.id, title: "Pilot" }, data: { teamId: membership.teamId, createdByUserId: user.id, title: "Pilot" },
})); }));
@@ -91,7 +115,7 @@ async function getAuthPayload(auth: AuthContext | null) {
const [user, team, conv] = await Promise.all([ const [user, team, conv] = await Promise.all([
prisma.user.findUnique({ where: { id: ctx.userId } }), prisma.user.findUnique({ where: { id: ctx.userId } }),
prisma.team.findUnique({ where: { id: ctx.teamId } }), prisma.team.findUnique({ where: { id: ctx.teamId } }),
prisma.chatConversation.findUnique({ prisma.aiConversation.findUnique({
where: { id: ctx.conversationId }, where: { id: ctx.conversationId },
include: { messages: { orderBy: { createdAt: "desc" }, take: 1, select: { text: true, createdAt: true } } }, include: { messages: { orderBy: { createdAt: "desc" }, take: 1, select: { text: true, createdAt: true } } },
}), }),
@@ -130,7 +154,7 @@ function defaultConversationTitle(input?: string | null) {
async function getChatConversations(auth: AuthContext | null) { async function getChatConversations(auth: AuthContext | null) {
const ctx = requireAuth(auth); const ctx = requireAuth(auth);
const items = await prisma.chatConversation.findMany({ const items = await prisma.aiConversation.findMany({
where: { teamId: ctx.teamId, createdByUserId: ctx.userId }, where: { teamId: ctx.teamId, createdByUserId: ctx.userId },
include: { include: {
messages: { messages: {
@@ -161,7 +185,7 @@ async function getChatConversations(auth: AuthContext | null) {
async function createChatConversation(auth: AuthContext | null, event: H3Event, titleInput?: string | null) { async function createChatConversation(auth: AuthContext | null, event: H3Event, titleInput?: string | null) {
const ctx = requireAuth(auth); const ctx = requireAuth(auth);
const conversation = await prisma.chatConversation.create({ const conversation = await prisma.aiConversation.create({
data: { data: {
teamId: ctx.teamId, teamId: ctx.teamId,
createdByUserId: ctx.userId, createdByUserId: ctx.userId,
@@ -190,7 +214,7 @@ async function selectChatConversation(auth: AuthContext | null, event: H3Event,
const convId = (id ?? "").trim(); const convId = (id ?? "").trim();
if (!convId) throw new Error("id is required"); if (!convId) throw new Error("id is required");
const conversation = await prisma.chatConversation.findFirst({ const conversation = await prisma.aiConversation.findFirst({
where: { where: {
id: convId, id: convId,
teamId: ctx.teamId, teamId: ctx.teamId,
@@ -215,7 +239,7 @@ async function archiveChatConversation(auth: AuthContext | null, event: H3Event,
const convId = (id ?? "").trim(); const convId = (id ?? "").trim();
if (!convId) throw new Error("id is required"); if (!convId) throw new Error("id is required");
const conversation = await prisma.chatConversation.findFirst({ const conversation = await prisma.aiConversation.findFirst({
where: { where: {
id: convId, id: convId,
teamId: ctx.teamId, teamId: ctx.teamId,
@@ -227,13 +251,13 @@ async function archiveChatConversation(auth: AuthContext | null, event: H3Event,
if (!conversation) throw new Error("conversation not found"); if (!conversation) throw new Error("conversation not found");
const nextConversationId = await prisma.$transaction(async (tx) => { const nextConversationId = await prisma.$transaction(async (tx) => {
await tx.chatConversation.delete({ where: { id: conversation.id } }); await tx.aiConversation.delete({ where: { id: conversation.id } });
if (ctx.conversationId !== conversation.id) { if (ctx.conversationId !== conversation.id) {
return ctx.conversationId; return ctx.conversationId;
} }
const created = await tx.chatConversation.create({ const created = await tx.aiConversation.create({
data: { teamId: ctx.teamId, createdByUserId: ctx.userId, title: "Pilot" }, data: { teamId: ctx.teamId, createdByUserId: ctx.userId, title: "Pilot" },
select: { id: true }, select: { id: true },
}); });
@@ -251,7 +275,7 @@ async function archiveChatConversation(auth: AuthContext | null, event: H3Event,
async function getChatMessages(auth: AuthContext | null) { async function getChatMessages(auth: AuthContext | null) {
const ctx = requireAuth(auth); const ctx = requireAuth(auth);
const items = await prisma.chatMessage.findMany({ const items = await prisma.aiMessage.findMany({
where: { teamId: ctx.teamId, conversationId: ctx.conversationId }, where: { teamId: ctx.teamId, conversationId: ctx.conversationId },
orderBy: { createdAt: "asc" }, orderBy: { createdAt: "asc" },
take: 200, take: 200,
@@ -363,6 +387,7 @@ async function getDashboard(auth: AuthContext | null) {
channel: string; channel: string;
direction: string; direction: string;
text: string; text: string;
rawJson: unknown;
status: string; status: string;
occurredAt: Date; occurredAt: Date;
updatedAt: Date; updatedAt: Date;
@@ -390,6 +415,7 @@ async function getDashboard(auth: AuthContext | null) {
channel: true, channel: true,
direction: true, direction: true,
text: true, text: true,
rawJson: true,
status: true, status: true,
occurredAt: true, occurredAt: true,
updatedAt: true, updatedAt: true,
@@ -421,7 +447,8 @@ async function getDashboard(auth: AuthContext | null) {
const omniByKey = new Map<string, typeof omniMessagesRaw>(); const omniByKey = new Map<string, typeof omniMessagesRaw>();
for (const row of omniMessagesRaw) { for (const row of omniMessagesRaw) {
const key = [row.contactId, row.channel, row.direction, row.text.trim()].join("|"); const normalizedText = extractOmniNormalizedText(row.rawJson, row.text);
const key = [row.contactId, row.channel, row.direction, normalizedText].join("|");
if (!omniByKey.has(key)) omniByKey.set(key, []); if (!omniByKey.has(key)) omniByKey.set(key, []);
omniByKey.get(key)?.push(row); omniByKey.get(key)?.push(row);
} }
@@ -703,7 +730,19 @@ async function createCommunication(auth: AuthContext | null, input: {
providerMessageId: null, providerMessageId: null,
providerUpdateId: null, providerUpdateId: null,
rawJson: { rawJson: {
version: 1,
source: "graphql.createCommunication", source: "graphql.createCommunication",
provider: "telegram_business",
normalized: {
channel: "TELEGRAM",
direction: "OUT",
text: content,
},
payloadNormalized: {
contactId: contact.id,
threadId: thread.id,
text: content,
},
enqueuedAt: new Date().toISOString(), enqueuedAt: new Date().toISOString(),
}, },
occurredAt, occurredAt,
@@ -715,14 +754,22 @@ async function createCommunication(auth: AuthContext | null, input: {
await enqueueTelegramSend({ omniMessageId: omniMessage.id }); await enqueueTelegramSend({ omniMessageId: omniMessage.id });
} catch (error) { } catch (error) {
const message = error instanceof Error ? error.message : String(error); const message = error instanceof Error ? error.message : String(error);
const existingOmni = await prisma.omniMessage.findUnique({
where: { id: omniMessage.id },
select: { rawJson: true },
});
await prisma.omniMessage.update({ await prisma.omniMessage.update({
where: { id: omniMessage.id }, where: { id: omniMessage.id },
data: { data: {
status: "FAILED", status: "FAILED",
rawJson: { rawJson: {
...asObject(existingOmni?.rawJson),
source: "graphql.createCommunication", source: "graphql.createCommunication",
deliveryError: { message }, delivery: {
failedAt: new Date().toISOString(), status: "FAILED",
error: { message },
failedAt: new Date().toISOString(),
},
}, },
}, },
}).catch(() => undefined); }).catch(() => undefined);
@@ -859,7 +906,7 @@ function renderChangeSetSummary(changeSet: ChangeSet): string {
async function findLatestChangeCarrierMessage(auth: AuthContext | null) { async function findLatestChangeCarrierMessage(auth: AuthContext | null) {
const ctx = requireAuth(auth); const ctx = requireAuth(auth);
const items = await prisma.chatMessage.findMany({ const items = await prisma.aiMessage.findMany({
where: { where: {
teamId: ctx.teamId, teamId: ctx.teamId,
conversationId: ctx.conversationId, conversationId: ctx.conversationId,
@@ -883,7 +930,7 @@ async function findChangeCarrierMessageByChangeSetId(auth: AuthContext | null, c
const targetId = String(changeSetId ?? "").trim(); const targetId = String(changeSetId ?? "").trim();
if (!targetId) return null; if (!targetId) return null;
const items = await prisma.chatMessage.findMany({ const items = await prisma.aiMessage.findMany({
where: { where: {
teamId: ctx.teamId, teamId: ctx.teamId,
conversationId: ctx.conversationId, conversationId: ctx.conversationId,
@@ -919,7 +966,7 @@ async function confirmLatestChangeSet(auth: AuthContext | null) {
}, },
}; };
await prisma.chatMessage.update({ await prisma.aiMessage.update({
where: { id: item.id }, where: { id: item.id },
data: { planJson: next as any }, data: { planJson: next as any },
}); });
@@ -951,7 +998,7 @@ async function rollbackLatestChangeSet(auth: AuthContext | null) {
}, },
}; };
await prisma.chatMessage.update({ await prisma.aiMessage.update({
where: { id: item.id }, where: { id: item.id },
data: { planJson: next as any }, data: { planJson: next as any },
}); });
@@ -994,7 +1041,7 @@ async function rollbackChangeSetItemsMutation(auth: AuthContext | null, changeSe
}, },
}; };
await prisma.chatMessage.update({ await prisma.aiMessage.update({
where: { id: item.id }, where: { id: item.id },
data: { planJson: next as any }, data: { planJson: next as any },
}); });
@@ -1010,7 +1057,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
const snapshotBefore = await captureSnapshot(prisma, ctx.teamId); const snapshotBefore = await captureSnapshot(prisma, ctx.teamId);
await persistChatMessage({ await persistAiMessage({
teamId: ctx.teamId, teamId: ctx.teamId,
conversationId: ctx.conversationId, conversationId: ctx.conversationId,
authorUserId: ctx.userId, authorUserId: ctx.userId,
@@ -1034,7 +1081,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
const snapshotAfter = await captureSnapshot(prisma, ctx.teamId); const snapshotAfter = await captureSnapshot(prisma, ctx.teamId);
const changeSet = buildChangeSet(snapshotBefore, snapshotAfter); const changeSet = buildChangeSet(snapshotBefore, snapshotAfter);
await persistChatMessage({ await persistAiMessage({
teamId: ctx.teamId, teamId: ctx.teamId,
conversationId: ctx.conversationId, conversationId: ctx.conversationId,
authorUserId: null, authorUserId: null,
@@ -1047,7 +1094,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
}); });
if (changeSet) { if (changeSet) {
await persistChatMessage({ await persistAiMessage({
teamId: ctx.teamId, teamId: ctx.teamId,
conversationId: ctx.conversationId, conversationId: ctx.conversationId,
authorUserId: null, authorUserId: null,
@@ -1070,7 +1117,7 @@ async function logPilotNote(auth: AuthContext | null, textInput: string) {
const text = (textInput ?? "").trim(); const text = (textInput ?? "").trim();
if (!text) throw new Error("text is required"); if (!text) throw new Error("text is required");
await persistChatMessage({ await persistAiMessage({
teamId: ctx.teamId, teamId: ctx.teamId,
conversationId: ctx.conversationId, conversationId: ctx.conversationId,
authorUserId: null, authorUserId: null,

View File

@@ -62,6 +62,11 @@ function extractProviderMessageId(body: unknown): string | null {
return String(candidate); return String(candidate);
} }
function asObject(value: unknown): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) return {};
return value as Record<string, unknown>;
}
export function outboundDeliveryQueue() { export function outboundDeliveryQueue() {
return new Queue<OutboundDeliveryJob, unknown, "deliver">(OUTBOUND_DELIVERY_QUEUE_NAME, { return new Queue<OutboundDeliveryJob, unknown, "deliver">(OUTBOUND_DELIVERY_QUEUE_NAME, {
connection: redisConnectionFromEnv(), connection: redisConnectionFromEnv(),
@@ -77,17 +82,27 @@ export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?:
const q = outboundDeliveryQueue(); const q = outboundDeliveryQueue();
const payload = (input.payload ?? null) as Prisma.InputJsonValue; const payload = (input.payload ?? null) as Prisma.InputJsonValue;
const existing = await prisma.omniMessage.findUnique({
where: { id: input.omniMessageId },
select: { rawJson: true },
});
const raw = asObject(existing?.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
// Keep source message in pending before actual send starts. // Keep source message in pending before actual send starts.
await prisma.omniMessage.update({ await prisma.omniMessage.update({
where: { id: input.omniMessageId }, where: { id: input.omniMessageId },
data: { data: {
status: "PENDING", status: "PENDING",
rawJson: { rawJson: {
...raw,
queue: { queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME, queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
enqueuedAt: new Date().toISOString(), enqueuedAt: new Date().toISOString(),
}, },
deliveryRequest: { deliveryRequest: {
...rawDeliveryRequest,
endpoint, endpoint,
method: input.method ?? "POST", method: input.method ?? "POST",
channel: input.channel ?? null, channel: input.channel ?? null,
@@ -151,18 +166,24 @@ export function startOutboundDeliveryWorker() {
} }
const providerMessageId = extractProviderMessageId(responseBody); const providerMessageId = extractProviderMessageId(responseBody);
const raw = asObject(msg.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
await prisma.omniMessage.update({ await prisma.omniMessage.update({
where: { id: msg.id }, where: { id: msg.id },
data: { data: {
status: "SENT", status: "SENT",
providerMessageId, providerMessageId,
rawJson: { rawJson: {
...raw,
queue: { queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME, queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
completedAt: new Date().toISOString(), completedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1, attemptsMade: job.attemptsMade + 1,
}, },
deliveryRequest: { deliveryRequest: {
...rawDeliveryRequest,
endpoint, endpoint,
method, method,
channel: job.data.channel ?? null, channel: job.data.channel ?? null,
@@ -182,17 +203,23 @@ export function startOutboundDeliveryWorker() {
typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts; typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts;
if (isLastAttempt) { if (isLastAttempt) {
const raw = asObject(msg.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
await prisma.omniMessage.update({ await prisma.omniMessage.update({
where: { id: msg.id }, where: { id: msg.id },
data: { data: {
status: "FAILED", status: "FAILED",
rawJson: { rawJson: {
...raw,
queue: { queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME, queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
failedAt: new Date().toISOString(), failedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1, attemptsMade: job.attemptsMade + 1,
}, },
deliveryRequest: { deliveryRequest: {
...rawDeliveryRequest,
endpoint, endpoint,
method, method,
channel: job.data.channel ?? null, channel: job.data.channel ?? null,

View File

@@ -7,6 +7,20 @@ type TelegramSendJob = {
omniMessageId: string; omniMessageId: string;
}; };
function asObject(value: unknown): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) return {};
return value as Record<string, unknown>;
}
function readNestedString(obj: Record<string, unknown>, path: string[]): string {
let current: unknown = obj;
for (const segment of path) {
if (!current || typeof current !== "object" || Array.isArray(current)) return "";
current = (current as Record<string, unknown>)[segment];
}
return typeof current === "string" ? current.trim() : "";
}
export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOptions) { export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOptions) {
const msg = await prisma.omniMessage.findUnique({ const msg = await prisma.omniMessage.findUnique({
where: { id: input.omniMessageId }, where: { id: input.omniMessageId },
@@ -16,12 +30,20 @@ export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOpt
if (msg.channel !== "TELEGRAM" || msg.direction !== "OUT") { if (msg.channel !== "TELEGRAM" || msg.direction !== "OUT") {
throw new Error(`Invalid omni message for telegram send: ${msg.id}`); throw new Error(`Invalid omni message for telegram send: ${msg.id}`);
} }
const raw = asObject(msg.rawJson);
const text =
readNestedString(raw, ["normalized", "text"]) ||
readNestedString(raw, ["payloadNormalized", "text"]) ||
msg.text;
if (!text) {
throw new Error(`Omni message has empty text payload: ${msg.id}`);
}
const token = requireTelegramBotToken(); const token = requireTelegramBotToken();
const endpoint = `${telegramApiBase()}/bot${token}/sendMessage`; const endpoint = `${telegramApiBase()}/bot${token}/sendMessage`;
const payload = { const payload = {
chat_id: msg.thread.externalChatId, chat_id: msg.thread.externalChatId,
text: msg.text, text,
...(msg.thread.businessConnectionId ? { business_connection_id: msg.thread.businessConnectionId } : {}), ...(msg.thread.businessConnectionId ? { business_connection_id: msg.thread.businessConnectionId } : {}),
}; };

View File

@@ -66,7 +66,7 @@ async function validateSessionFromPeer(peer: any) {
const [user, team, conv] = await Promise.all([ const [user, team, conv] = await Promise.all([
prisma.user.findUnique({ where: { id: userId }, select: { id: true } }), prisma.user.findUnique({ where: { id: userId }, select: { id: true } }),
prisma.team.findUnique({ where: { id: teamId }, select: { id: true } }), prisma.team.findUnique({ where: { id: teamId }, select: { id: true } }),
prisma.chatConversation.findFirst({ prisma.aiConversation.findFirst({
where: { id: conversationId, teamId, createdByUserId: userId }, where: { id: conversationId, teamId, createdByUserId: userId },
select: { id: true }, select: { id: true },
}), }),

View File

@@ -64,7 +64,7 @@ export async function getAuthContext(event: H3Event): Promise<AuthContext> {
throw createError({ statusCode: 401, statusMessage: "Unauthorized" }); throw createError({ statusCode: 401, statusMessage: "Unauthorized" });
} }
const conv = await prisma.chatConversation.findFirst({ const conv = await prisma.aiConversation.findFirst({
where: { id: conversationId, teamId: team.id, createdByUserId: user.id }, where: { id: conversationId, teamId: team.id, createdByUserId: user.id },
}); });
@@ -92,7 +92,7 @@ export async function ensureDemoAuth() {
update: {}, update: {},
create: { teamId: team.id, userId: user.id, role: "OWNER" }, create: { teamId: team.id, userId: user.id, role: "OWNER" },
}); });
const conv = await prisma.chatConversation.upsert({ const conv = await prisma.aiConversation.upsert({
where: { id: `pilot-${team.id}` }, where: { id: `pilot-${team.id}` },
update: {}, update: {},
create: { id: `pilot-${team.id}`, teamId: team.id, createdByUserId: user.id, title: "Pilot" }, create: { id: `pilot-${team.id}`, teamId: team.id, createdByUserId: user.id, title: "Pilot" },

View File

@@ -68,8 +68,8 @@ model Team {
contacts Contact[] contacts Contact[]
calendarEvents CalendarEvent[] calendarEvents CalendarEvent[]
deals Deal[] deals Deal[]
conversations ChatConversation[] aiConversations AiConversation[]
chatMessages ChatMessage[] aiMessages AiMessage[]
omniThreads OmniThread[] omniThreads OmniThread[]
omniMessages OmniMessage[] omniMessages OmniMessage[]
@@ -90,9 +90,9 @@ model User {
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
memberships TeamMember[] memberships TeamMember[]
conversations ChatConversation[] @relation("ConversationCreator") aiConversations AiConversation[] @relation("ConversationCreator")
chatMessages ChatMessage[] @relation("ChatAuthor") aiMessages AiMessage[] @relation("ChatAuthor")
} }
model TeamMember { model TeamMember {
@@ -305,7 +305,7 @@ model DealStep {
@@index([status, dueAt]) @@index([status, dueAt])
} }
model ChatConversation { model AiConversation {
id String @id @default(cuid()) id String @id @default(cuid())
teamId String teamId String
createdByUserId String createdByUserId String
@@ -313,15 +313,16 @@ model ChatConversation {
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade)
messages ChatMessage[] messages AiMessage[]
@@index([teamId, updatedAt]) @@index([teamId, updatedAt])
@@index([createdByUserId]) @@index([createdByUserId])
@@map("ChatConversation")
} }
model ChatMessage { model AiMessage {
id String @id @default(cuid()) id String @id @default(cuid())
teamId String teamId String
conversationId String conversationId String
@@ -331,13 +332,14 @@ model ChatMessage {
planJson Json? planJson Json?
createdAt DateTime @default(now()) createdAt DateTime @default(now())
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
conversation ChatConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade)
authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull)
@@index([createdAt]) @@index([createdAt])
@@index([teamId, createdAt]) @@index([teamId, createdAt])
@@index([conversationId, createdAt]) @@index([conversationId, createdAt])
@@map("ChatMessage")
} }
model FeedCard { model FeedCard {

View File

@@ -325,6 +325,24 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
businessConnectionId, businessConnectionId,
title: asString(n.chatTitle), title: asString(n.chatTitle),
}); });
const rawEnvelope = {
version: env.version,
source: "omni_chat.receiver",
provider: env.provider,
channel: env.channel,
direction,
providerEventId: env.providerEventId,
receivedAt: env.receivedAt,
occurredAt: occurredAt.toISOString(),
normalized: {
text,
threadExternalId: externalChatId,
contactExternalId,
businessConnectionId,
},
payloadNormalized: n,
payloadRaw: (env.payloadRaw ?? null) as Prisma.InputJsonValue,
} as Prisma.InputJsonValue;
if (env.providerMessageId) { if (env.providerMessageId) {
await prisma.omniMessage.upsert({ await prisma.omniMessage.upsert({
@@ -344,13 +362,13 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
text, text,
providerMessageId: env.providerMessageId, providerMessageId: env.providerMessageId,
providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId), providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId),
rawJson: (env.payloadRaw ?? null) as Prisma.InputJsonValue, rawJson: rawEnvelope,
occurredAt, occurredAt,
}, },
update: { update: {
text, text,
providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId), providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId),
rawJson: (env.payloadRaw ?? null) as Prisma.InputJsonValue, rawJson: rawEnvelope,
occurredAt, occurredAt,
}, },
}); });
@@ -366,7 +384,7 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
text, text,
providerMessageId: null, providerMessageId: null,
providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId), providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId),
rawJson: (env.payloadRaw ?? null) as Prisma.InputJsonValue, rawJson: rawEnvelope,
occurredAt, occurredAt,
}, },
}); });

View File

@@ -68,8 +68,8 @@ model Team {
contacts Contact[] contacts Contact[]
calendarEvents CalendarEvent[] calendarEvents CalendarEvent[]
deals Deal[] deals Deal[]
conversations ChatConversation[] aiConversations AiConversation[]
chatMessages ChatMessage[] aiMessages AiMessage[]
omniThreads OmniThread[] omniThreads OmniThread[]
omniMessages OmniMessage[] omniMessages OmniMessage[]
@@ -90,9 +90,9 @@ model User {
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
memberships TeamMember[] memberships TeamMember[]
conversations ChatConversation[] @relation("ConversationCreator") aiConversations AiConversation[] @relation("ConversationCreator")
chatMessages ChatMessage[] @relation("ChatAuthor") aiMessages AiMessage[] @relation("ChatAuthor")
} }
model TeamMember { model TeamMember {
@@ -305,7 +305,7 @@ model DealStep {
@@index([status, dueAt]) @@index([status, dueAt])
} }
model ChatConversation { model AiConversation {
id String @id @default(cuid()) id String @id @default(cuid())
teamId String teamId String
createdByUserId String createdByUserId String
@@ -313,15 +313,16 @@ model ChatConversation {
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade)
messages ChatMessage[] messages AiMessage[]
@@index([teamId, updatedAt]) @@index([teamId, updatedAt])
@@index([createdByUserId]) @@index([createdByUserId])
@@map("ChatConversation")
} }
model ChatMessage { model AiMessage {
id String @id @default(cuid()) id String @id @default(cuid())
teamId String teamId String
conversationId String conversationId String
@@ -331,13 +332,14 @@ model ChatMessage {
planJson Json? planJson Json?
createdAt DateTime @default(now()) createdAt DateTime @default(now())
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
conversation ChatConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade)
authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull)
@@index([createdAt]) @@index([createdAt])
@@index([teamId, createdAt]) @@index([teamId, createdAt])
@@index([conversationId, createdAt]) @@index([conversationId, createdAt])
@@map("ChatMessage")
} }
model FeedCard { model FeedCard {

View File

@@ -62,6 +62,11 @@ function extractProviderMessageId(body: unknown): string | null {
return String(candidate); return String(candidate);
} }
function asObject(value: unknown): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) return {};
return value as Record<string, unknown>;
}
export function outboundDeliveryQueue() { export function outboundDeliveryQueue() {
return new Queue<OutboundDeliveryJob, unknown, "deliver">(OUTBOUND_DELIVERY_QUEUE_NAME, { return new Queue<OutboundDeliveryJob, unknown, "deliver">(OUTBOUND_DELIVERY_QUEUE_NAME, {
connection: redisConnectionFromEnv(), connection: redisConnectionFromEnv(),
@@ -77,16 +82,26 @@ export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?:
const q = outboundDeliveryQueue(); const q = outboundDeliveryQueue();
const payload = (input.payload ?? null) as Prisma.InputJsonValue; const payload = (input.payload ?? null) as Prisma.InputJsonValue;
const existing = await prisma.omniMessage.findUnique({
where: { id: input.omniMessageId },
select: { rawJson: true },
});
const raw = asObject(existing?.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
await prisma.omniMessage.update({ await prisma.omniMessage.update({
where: { id: input.omniMessageId }, where: { id: input.omniMessageId },
data: { data: {
status: "PENDING", status: "PENDING",
rawJson: { rawJson: {
...raw,
queue: { queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME, queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
enqueuedAt: new Date().toISOString(), enqueuedAt: new Date().toISOString(),
}, },
deliveryRequest: { deliveryRequest: {
...rawDeliveryRequest,
endpoint, endpoint,
method: input.method ?? "POST", method: input.method ?? "POST",
channel: input.channel ?? null, channel: input.channel ?? null,
@@ -149,18 +164,24 @@ export function startOutboundDeliveryWorker() {
} }
const providerMessageId = extractProviderMessageId(responseBody); const providerMessageId = extractProviderMessageId(responseBody);
const raw = asObject(msg.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
await prisma.omniMessage.update({ await prisma.omniMessage.update({
where: { id: msg.id }, where: { id: msg.id },
data: { data: {
status: "SENT", status: "SENT",
providerMessageId, providerMessageId,
rawJson: { rawJson: {
...raw,
queue: { queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME, queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
completedAt: new Date().toISOString(), completedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1, attemptsMade: job.attemptsMade + 1,
}, },
deliveryRequest: { deliveryRequest: {
...rawDeliveryRequest,
endpoint, endpoint,
method, method,
channel: job.data.channel ?? null, channel: job.data.channel ?? null,
@@ -180,17 +201,23 @@ export function startOutboundDeliveryWorker() {
typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts; typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts;
if (isLastAttempt) { if (isLastAttempt) {
const raw = asObject(msg.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
await prisma.omniMessage.update({ await prisma.omniMessage.update({
where: { id: msg.id }, where: { id: msg.id },
data: { data: {
status: "FAILED", status: "FAILED",
rawJson: { rawJson: {
...raw,
queue: { queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME, queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
failedAt: new Date().toISOString(), failedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1, attemptsMade: job.attemptsMade + 1,
}, },
deliveryRequest: { deliveryRequest: {
...rawDeliveryRequest,
endpoint, endpoint,
method, method,
channel: job.data.channel ?? null, channel: job.data.channel ?? null,