diff --git a/frontend/prisma/schema.prisma b/frontend/prisma/schema.prisma index e344556..da6d02f 100644 --- a/frontend/prisma/schema.prisma +++ b/frontend/prisma/schema.prisma @@ -68,8 +68,8 @@ model Team { contacts Contact[] calendarEvents CalendarEvent[] deals Deal[] - conversations ChatConversation[] - chatMessages ChatMessage[] + aiConversations AiConversation[] + aiMessages AiMessage[] omniThreads OmniThread[] omniMessages OmniMessage[] @@ -90,9 +90,9 @@ model User { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - memberships TeamMember[] - conversations ChatConversation[] @relation("ConversationCreator") - chatMessages ChatMessage[] @relation("ChatAuthor") + memberships TeamMember[] + aiConversations AiConversation[] @relation("ConversationCreator") + aiMessages AiMessage[] @relation("ChatAuthor") } model TeamMember { @@ -305,7 +305,7 @@ model DealStep { @@index([status, dueAt]) } -model ChatConversation { +model AiConversation { id String @id @default(cuid()) teamId String createdByUserId String @@ -313,15 +313,16 @@ model ChatConversation { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) - createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) - messages ChatMessage[] + team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) + createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) + messages AiMessage[] @@index([teamId, updatedAt]) @@index([createdByUserId]) + @@map("ChatConversation") } -model ChatMessage { +model AiMessage { id String @id @default(cuid()) teamId String conversationId String @@ -331,13 +332,14 @@ model ChatMessage { planJson Json? createdAt DateTime @default(now()) - team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) - conversation ChatConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) - authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) + team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) + conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) + authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) @@index([createdAt]) @@index([teamId, createdAt]) @@index([conversationId, createdAt]) + @@map("ChatMessage") } model FeedCard { diff --git a/frontend/prisma/seed.mjs b/frontend/prisma/seed.mjs index 9256ab5..fc54ca4 100644 --- a/frontend/prisma/seed.mjs +++ b/frontend/prisma/seed.mjs @@ -123,7 +123,7 @@ async function main() { create: { teamId: team.id, userId: user.id, role: "OWNER" }, }); - const conversation = await prisma.chatConversation.upsert({ + const conversation = await prisma.aiConversation.upsert({ where: { id: `pilot-${team.id}` }, update: { title: "Пилот" }, create: { id: `pilot-${team.id}`, teamId: team.id, createdByUserId: user.id, title: "Пилот" }, @@ -136,7 +136,7 @@ async function main() { prisma.deal.deleteMany({ where: { teamId: team.id } }), prisma.calendarEvent.deleteMany({ where: { teamId: team.id } }), prisma.contactMessage.deleteMany({ where: { contact: { teamId: team.id } } }), - prisma.chatMessage.deleteMany({ where: { teamId: team.id, conversationId: conversation.id } }), + prisma.aiMessage.deleteMany({ where: { teamId: team.id, conversationId: conversation.id } }), prisma.omniMessage.deleteMany({ where: { teamId: team.id } }), prisma.omniThread.deleteMany({ where: { teamId: team.id } }), prisma.omniContactIdentity.deleteMany({ where: { teamId: team.id } }), diff --git a/frontend/server/agent/crmAgent.ts b/frontend/server/agent/crmAgent.ts index 227d085..5436e22 100644 --- a/frontend/server/agent/crmAgent.ts +++ b/frontend/server/agent/crmAgent.ts @@ -255,7 +255,7 @@ export async function runCrmAgentFor( ); } -export async function persistChatMessage(input: { +export async function persistAiMessage(input: { role: ChatRole; text: string; plan?: string[]; @@ -279,7 +279,7 @@ export async function persistChatMessage(input: { authorUserId?: string | null; }) { const hasStoredPayload = Boolean(input.changeSet || input.messageKind); - const data: Prisma.ChatMessageCreateInput = { + const data: Prisma.AiMessageCreateInput = { team: { connect: { id: input.teamId } }, conversation: { connect: { id: input.conversationId } }, authorUser: input.authorUserId ? { connect: { id: input.authorUserId } } : undefined, @@ -292,5 +292,5 @@ export async function persistChatMessage(input: { } as any) : undefined, }; - return prisma.chatMessage.create({ data }); + return prisma.aiMessage.create({ data }); } diff --git a/frontend/server/api/pilot-chat.post.ts b/frontend/server/api/pilot-chat.post.ts index b315267..165b65d 100644 --- a/frontend/server/api/pilot-chat.post.ts +++ b/frontend/server/api/pilot-chat.post.ts @@ -3,7 +3,7 @@ import { createUIMessageStream, createUIMessageStreamResponse } from "ai"; import { getAuthContext } from "../utils/auth"; import { prisma } from "../utils/prisma"; import { buildChangeSet, captureSnapshot } from "../utils/changeSet"; -import { persistChatMessage, runCrmAgentFor, type AgentTraceEvent } from "../agent/crmAgent"; +import { persistAiMessage, runCrmAgentFor, type AgentTraceEvent } from "../agent/crmAgent"; import type { PilotContextPayload } from "../agent/crmAgent"; import type { ChangeSet } from "../utils/changeSet"; @@ -143,7 +143,7 @@ export default defineEventHandler(async (event) => { try { const snapshotBefore = await captureSnapshot(prisma, auth.teamId); - await persistChatMessage({ + await persistAiMessage({ teamId: auth.teamId, conversationId: auth.conversationId, authorUserId: auth.userId, @@ -177,7 +177,7 @@ export default defineEventHandler(async (event) => { const snapshotAfter = await captureSnapshot(prisma, auth.teamId); const changeSet = buildChangeSet(snapshotBefore, snapshotAfter); - await persistChatMessage({ + await persistAiMessage({ teamId: auth.teamId, conversationId: auth.conversationId, authorUserId: null, @@ -190,7 +190,7 @@ export default defineEventHandler(async (event) => { }); if (changeSet) { - await persistChatMessage({ + await persistAiMessage({ teamId: auth.teamId, conversationId: auth.conversationId, authorUserId: null, @@ -212,7 +212,7 @@ export default defineEventHandler(async (event) => { } catch (error: any) { const errorText = String(error?.message ?? error); - await persistChatMessage({ + await persistAiMessage({ teamId: auth.teamId, conversationId: auth.conversationId, authorUserId: null, diff --git a/frontend/server/graphql/schema.ts b/frontend/server/graphql/schema.ts index 1d896ce..f7eef95 100644 --- a/frontend/server/graphql/schema.ts +++ b/frontend/server/graphql/schema.ts @@ -4,7 +4,7 @@ import type { AuthContext } from "../utils/auth"; import { clearAuthSession, setSession } from "../utils/auth"; import { prisma } from "../utils/prisma"; import { normalizePhone, verifyPassword } from "../utils/password"; -import { persistChatMessage, runCrmAgentFor } from "../agent/crmAgent"; +import { persistAiMessage, runCrmAgentFor } from "../agent/crmAgent"; import { buildChangeSet, captureSnapshot, rollbackChangeSet, rollbackChangeSetItems } from "../utils/changeSet"; import type { ChangeSet } from "../utils/changeSet"; import { enqueueTelegramSend } from "../queues/telegramSend"; @@ -38,6 +38,30 @@ function toDbChannel(channel: string) { return "PHONE"; } +function asObject(value: unknown): Record { + if (!value || typeof value !== "object" || Array.isArray(value)) return {}; + return value as Record; +} + +function readNestedString(obj: Record, path: string[]): string { + let current: unknown = obj; + for (const segment of path) { + if (!current || typeof current !== "object" || Array.isArray(current)) return ""; + current = (current as Record)[segment]; + } + return typeof current === "string" ? current.trim() : ""; +} + +function extractOmniNormalizedText(rawJson: unknown, fallbackText = "") { + const raw = asObject(rawJson); + return ( + readNestedString(raw, ["normalized", "text"]) || + readNestedString(raw, ["payloadNormalized", "text"]) || + readNestedString(raw, ["deliveryRequest", "payload", "text"]) || + String(fallbackText ?? "").trim() + ); +} + async function loginWithPassword(event: H3Event, phoneInput: string, passwordInput: string) { const phone = normalizePhone(phoneInput); const password = (passwordInput ?? "").trim(); @@ -69,11 +93,11 @@ async function loginWithPassword(event: H3Event, phoneInput: string, passwordInp } const conversation = - (await prisma.chatConversation.findFirst({ + (await prisma.aiConversation.findFirst({ where: { teamId: membership.teamId, createdByUserId: user.id }, orderBy: { createdAt: "desc" }, })) || - (await prisma.chatConversation.create({ + (await prisma.aiConversation.create({ data: { teamId: membership.teamId, createdByUserId: user.id, title: "Pilot" }, })); @@ -91,7 +115,7 @@ async function getAuthPayload(auth: AuthContext | null) { const [user, team, conv] = await Promise.all([ prisma.user.findUnique({ where: { id: ctx.userId } }), prisma.team.findUnique({ where: { id: ctx.teamId } }), - prisma.chatConversation.findUnique({ + prisma.aiConversation.findUnique({ where: { id: ctx.conversationId }, include: { messages: { orderBy: { createdAt: "desc" }, take: 1, select: { text: true, createdAt: true } } }, }), @@ -130,7 +154,7 @@ function defaultConversationTitle(input?: string | null) { async function getChatConversations(auth: AuthContext | null) { const ctx = requireAuth(auth); - const items = await prisma.chatConversation.findMany({ + const items = await prisma.aiConversation.findMany({ where: { teamId: ctx.teamId, createdByUserId: ctx.userId }, include: { messages: { @@ -161,7 +185,7 @@ async function getChatConversations(auth: AuthContext | null) { async function createChatConversation(auth: AuthContext | null, event: H3Event, titleInput?: string | null) { const ctx = requireAuth(auth); - const conversation = await prisma.chatConversation.create({ + const conversation = await prisma.aiConversation.create({ data: { teamId: ctx.teamId, createdByUserId: ctx.userId, @@ -190,7 +214,7 @@ async function selectChatConversation(auth: AuthContext | null, event: H3Event, const convId = (id ?? "").trim(); if (!convId) throw new Error("id is required"); - const conversation = await prisma.chatConversation.findFirst({ + const conversation = await prisma.aiConversation.findFirst({ where: { id: convId, teamId: ctx.teamId, @@ -215,7 +239,7 @@ async function archiveChatConversation(auth: AuthContext | null, event: H3Event, const convId = (id ?? "").trim(); if (!convId) throw new Error("id is required"); - const conversation = await prisma.chatConversation.findFirst({ + const conversation = await prisma.aiConversation.findFirst({ where: { id: convId, teamId: ctx.teamId, @@ -227,13 +251,13 @@ async function archiveChatConversation(auth: AuthContext | null, event: H3Event, if (!conversation) throw new Error("conversation not found"); const nextConversationId = await prisma.$transaction(async (tx) => { - await tx.chatConversation.delete({ where: { id: conversation.id } }); + await tx.aiConversation.delete({ where: { id: conversation.id } }); if (ctx.conversationId !== conversation.id) { return ctx.conversationId; } - const created = await tx.chatConversation.create({ + const created = await tx.aiConversation.create({ data: { teamId: ctx.teamId, createdByUserId: ctx.userId, title: "Pilot" }, select: { id: true }, }); @@ -251,7 +275,7 @@ async function archiveChatConversation(auth: AuthContext | null, event: H3Event, async function getChatMessages(auth: AuthContext | null) { const ctx = requireAuth(auth); - const items = await prisma.chatMessage.findMany({ + const items = await prisma.aiMessage.findMany({ where: { teamId: ctx.teamId, conversationId: ctx.conversationId }, orderBy: { createdAt: "asc" }, take: 200, @@ -363,6 +387,7 @@ async function getDashboard(auth: AuthContext | null) { channel: string; direction: string; text: string; + rawJson: unknown; status: string; occurredAt: Date; updatedAt: Date; @@ -390,6 +415,7 @@ async function getDashboard(auth: AuthContext | null) { channel: true, direction: true, text: true, + rawJson: true, status: true, occurredAt: true, updatedAt: true, @@ -421,7 +447,8 @@ async function getDashboard(auth: AuthContext | null) { const omniByKey = new Map(); for (const row of omniMessagesRaw) { - const key = [row.contactId, row.channel, row.direction, row.text.trim()].join("|"); + const normalizedText = extractOmniNormalizedText(row.rawJson, row.text); + const key = [row.contactId, row.channel, row.direction, normalizedText].join("|"); if (!omniByKey.has(key)) omniByKey.set(key, []); omniByKey.get(key)?.push(row); } @@ -703,7 +730,19 @@ async function createCommunication(auth: AuthContext | null, input: { providerMessageId: null, providerUpdateId: null, rawJson: { + version: 1, source: "graphql.createCommunication", + provider: "telegram_business", + normalized: { + channel: "TELEGRAM", + direction: "OUT", + text: content, + }, + payloadNormalized: { + contactId: contact.id, + threadId: thread.id, + text: content, + }, enqueuedAt: new Date().toISOString(), }, occurredAt, @@ -715,14 +754,22 @@ async function createCommunication(auth: AuthContext | null, input: { await enqueueTelegramSend({ omniMessageId: omniMessage.id }); } catch (error) { const message = error instanceof Error ? error.message : String(error); + const existingOmni = await prisma.omniMessage.findUnique({ + where: { id: omniMessage.id }, + select: { rawJson: true }, + }); await prisma.omniMessage.update({ where: { id: omniMessage.id }, data: { status: "FAILED", rawJson: { + ...asObject(existingOmni?.rawJson), source: "graphql.createCommunication", - deliveryError: { message }, - failedAt: new Date().toISOString(), + delivery: { + status: "FAILED", + error: { message }, + failedAt: new Date().toISOString(), + }, }, }, }).catch(() => undefined); @@ -859,7 +906,7 @@ function renderChangeSetSummary(changeSet: ChangeSet): string { async function findLatestChangeCarrierMessage(auth: AuthContext | null) { const ctx = requireAuth(auth); - const items = await prisma.chatMessage.findMany({ + const items = await prisma.aiMessage.findMany({ where: { teamId: ctx.teamId, conversationId: ctx.conversationId, @@ -883,7 +930,7 @@ async function findChangeCarrierMessageByChangeSetId(auth: AuthContext | null, c const targetId = String(changeSetId ?? "").trim(); if (!targetId) return null; - const items = await prisma.chatMessage.findMany({ + const items = await prisma.aiMessage.findMany({ where: { teamId: ctx.teamId, conversationId: ctx.conversationId, @@ -919,7 +966,7 @@ async function confirmLatestChangeSet(auth: AuthContext | null) { }, }; - await prisma.chatMessage.update({ + await prisma.aiMessage.update({ where: { id: item.id }, data: { planJson: next as any }, }); @@ -951,7 +998,7 @@ async function rollbackLatestChangeSet(auth: AuthContext | null) { }, }; - await prisma.chatMessage.update({ + await prisma.aiMessage.update({ where: { id: item.id }, data: { planJson: next as any }, }); @@ -994,7 +1041,7 @@ async function rollbackChangeSetItemsMutation(auth: AuthContext | null, changeSe }, }; - await prisma.chatMessage.update({ + await prisma.aiMessage.update({ where: { id: item.id }, data: { planJson: next as any }, }); @@ -1010,7 +1057,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) { const snapshotBefore = await captureSnapshot(prisma, ctx.teamId); - await persistChatMessage({ + await persistAiMessage({ teamId: ctx.teamId, conversationId: ctx.conversationId, authorUserId: ctx.userId, @@ -1034,7 +1081,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) { const snapshotAfter = await captureSnapshot(prisma, ctx.teamId); const changeSet = buildChangeSet(snapshotBefore, snapshotAfter); - await persistChatMessage({ + await persistAiMessage({ teamId: ctx.teamId, conversationId: ctx.conversationId, authorUserId: null, @@ -1047,7 +1094,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) { }); if (changeSet) { - await persistChatMessage({ + await persistAiMessage({ teamId: ctx.teamId, conversationId: ctx.conversationId, authorUserId: null, @@ -1070,7 +1117,7 @@ async function logPilotNote(auth: AuthContext | null, textInput: string) { const text = (textInput ?? "").trim(); if (!text) throw new Error("text is required"); - await persistChatMessage({ + await persistAiMessage({ teamId: ctx.teamId, conversationId: ctx.conversationId, authorUserId: null, diff --git a/frontend/server/queues/outboundDelivery.ts b/frontend/server/queues/outboundDelivery.ts index c7d62a4..0fac3c5 100644 --- a/frontend/server/queues/outboundDelivery.ts +++ b/frontend/server/queues/outboundDelivery.ts @@ -62,6 +62,11 @@ function extractProviderMessageId(body: unknown): string | null { return String(candidate); } +function asObject(value: unknown): Record { + if (!value || typeof value !== "object" || Array.isArray(value)) return {}; + return value as Record; +} + export function outboundDeliveryQueue() { return new Queue(OUTBOUND_DELIVERY_QUEUE_NAME, { connection: redisConnectionFromEnv(), @@ -77,17 +82,27 @@ export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?: const q = outboundDeliveryQueue(); const payload = (input.payload ?? null) as Prisma.InputJsonValue; + const existing = await prisma.omniMessage.findUnique({ + where: { id: input.omniMessageId }, + select: { rawJson: true }, + }); + const raw = asObject(existing?.rawJson); + const rawQueue = asObject(raw.queue); + const rawDeliveryRequest = asObject(raw.deliveryRequest); // Keep source message in pending before actual send starts. await prisma.omniMessage.update({ where: { id: input.omniMessageId }, data: { status: "PENDING", rawJson: { + ...raw, queue: { + ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, enqueuedAt: new Date().toISOString(), }, deliveryRequest: { + ...rawDeliveryRequest, endpoint, method: input.method ?? "POST", channel: input.channel ?? null, @@ -151,18 +166,24 @@ export function startOutboundDeliveryWorker() { } const providerMessageId = extractProviderMessageId(responseBody); + const raw = asObject(msg.rawJson); + const rawQueue = asObject(raw.queue); + const rawDeliveryRequest = asObject(raw.deliveryRequest); await prisma.omniMessage.update({ where: { id: msg.id }, data: { status: "SENT", providerMessageId, rawJson: { + ...raw, queue: { + ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, completedAt: new Date().toISOString(), attemptsMade: job.attemptsMade + 1, }, deliveryRequest: { + ...rawDeliveryRequest, endpoint, method, channel: job.data.channel ?? null, @@ -182,17 +203,23 @@ export function startOutboundDeliveryWorker() { typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts; if (isLastAttempt) { + const raw = asObject(msg.rawJson); + const rawQueue = asObject(raw.queue); + const rawDeliveryRequest = asObject(raw.deliveryRequest); await prisma.omniMessage.update({ where: { id: msg.id }, data: { status: "FAILED", rawJson: { + ...raw, queue: { + ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, failedAt: new Date().toISOString(), attemptsMade: job.attemptsMade + 1, }, deliveryRequest: { + ...rawDeliveryRequest, endpoint, method, channel: job.data.channel ?? null, diff --git a/frontend/server/queues/telegramSend.ts b/frontend/server/queues/telegramSend.ts index 0812c62..ce1eac5 100644 --- a/frontend/server/queues/telegramSend.ts +++ b/frontend/server/queues/telegramSend.ts @@ -7,6 +7,20 @@ type TelegramSendJob = { omniMessageId: string; }; +function asObject(value: unknown): Record { + if (!value || typeof value !== "object" || Array.isArray(value)) return {}; + return value as Record; +} + +function readNestedString(obj: Record, path: string[]): string { + let current: unknown = obj; + for (const segment of path) { + if (!current || typeof current !== "object" || Array.isArray(current)) return ""; + current = (current as Record)[segment]; + } + return typeof current === "string" ? current.trim() : ""; +} + export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOptions) { const msg = await prisma.omniMessage.findUnique({ where: { id: input.omniMessageId }, @@ -16,12 +30,20 @@ export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOpt if (msg.channel !== "TELEGRAM" || msg.direction !== "OUT") { throw new Error(`Invalid omni message for telegram send: ${msg.id}`); } + const raw = asObject(msg.rawJson); + const text = + readNestedString(raw, ["normalized", "text"]) || + readNestedString(raw, ["payloadNormalized", "text"]) || + msg.text; + if (!text) { + throw new Error(`Omni message has empty text payload: ${msg.id}`); + } const token = requireTelegramBotToken(); const endpoint = `${telegramApiBase()}/bot${token}/sendMessage`; const payload = { chat_id: msg.thread.externalChatId, - text: msg.text, + text, ...(msg.thread.businessConnectionId ? { business_connection_id: msg.thread.businessConnectionId } : {}), }; diff --git a/frontend/server/routes/ws/crm-updates.ts b/frontend/server/routes/ws/crm-updates.ts index f1d6df1..900b23c 100644 --- a/frontend/server/routes/ws/crm-updates.ts +++ b/frontend/server/routes/ws/crm-updates.ts @@ -66,7 +66,7 @@ async function validateSessionFromPeer(peer: any) { const [user, team, conv] = await Promise.all([ prisma.user.findUnique({ where: { id: userId }, select: { id: true } }), prisma.team.findUnique({ where: { id: teamId }, select: { id: true } }), - prisma.chatConversation.findFirst({ + prisma.aiConversation.findFirst({ where: { id: conversationId, teamId, createdByUserId: userId }, select: { id: true }, }), diff --git a/frontend/server/utils/auth.ts b/frontend/server/utils/auth.ts index 28ec656..699da70 100644 --- a/frontend/server/utils/auth.ts +++ b/frontend/server/utils/auth.ts @@ -64,7 +64,7 @@ export async function getAuthContext(event: H3Event): Promise { throw createError({ statusCode: 401, statusMessage: "Unauthorized" }); } - const conv = await prisma.chatConversation.findFirst({ + const conv = await prisma.aiConversation.findFirst({ where: { id: conversationId, teamId: team.id, createdByUserId: user.id }, }); @@ -92,7 +92,7 @@ export async function ensureDemoAuth() { update: {}, create: { teamId: team.id, userId: user.id, role: "OWNER" }, }); - const conv = await prisma.chatConversation.upsert({ + const conv = await prisma.aiConversation.upsert({ where: { id: `pilot-${team.id}` }, update: {}, create: { id: `pilot-${team.id}`, teamId: team.id, createdByUserId: user.id, title: "Pilot" }, diff --git a/omni_chat/prisma/schema.prisma b/omni_chat/prisma/schema.prisma index e344556..da6d02f 100644 --- a/omni_chat/prisma/schema.prisma +++ b/omni_chat/prisma/schema.prisma @@ -68,8 +68,8 @@ model Team { contacts Contact[] calendarEvents CalendarEvent[] deals Deal[] - conversations ChatConversation[] - chatMessages ChatMessage[] + aiConversations AiConversation[] + aiMessages AiMessage[] omniThreads OmniThread[] omniMessages OmniMessage[] @@ -90,9 +90,9 @@ model User { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - memberships TeamMember[] - conversations ChatConversation[] @relation("ConversationCreator") - chatMessages ChatMessage[] @relation("ChatAuthor") + memberships TeamMember[] + aiConversations AiConversation[] @relation("ConversationCreator") + aiMessages AiMessage[] @relation("ChatAuthor") } model TeamMember { @@ -305,7 +305,7 @@ model DealStep { @@index([status, dueAt]) } -model ChatConversation { +model AiConversation { id String @id @default(cuid()) teamId String createdByUserId String @@ -313,15 +313,16 @@ model ChatConversation { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) - createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) - messages ChatMessage[] + team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) + createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) + messages AiMessage[] @@index([teamId, updatedAt]) @@index([createdByUserId]) + @@map("ChatConversation") } -model ChatMessage { +model AiMessage { id String @id @default(cuid()) teamId String conversationId String @@ -331,13 +332,14 @@ model ChatMessage { planJson Json? createdAt DateTime @default(now()) - team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) - conversation ChatConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) - authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) + team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) + conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) + authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) @@index([createdAt]) @@index([teamId, createdAt]) @@index([conversationId, createdAt]) + @@map("ChatMessage") } model FeedCard { diff --git a/omni_chat/src/worker.ts b/omni_chat/src/worker.ts index a12eb39..74b3505 100644 --- a/omni_chat/src/worker.ts +++ b/omni_chat/src/worker.ts @@ -325,6 +325,24 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { businessConnectionId, title: asString(n.chatTitle), }); + const rawEnvelope = { + version: env.version, + source: "omni_chat.receiver", + provider: env.provider, + channel: env.channel, + direction, + providerEventId: env.providerEventId, + receivedAt: env.receivedAt, + occurredAt: occurredAt.toISOString(), + normalized: { + text, + threadExternalId: externalChatId, + contactExternalId, + businessConnectionId, + }, + payloadNormalized: n, + payloadRaw: (env.payloadRaw ?? null) as Prisma.InputJsonValue, + } as Prisma.InputJsonValue; if (env.providerMessageId) { await prisma.omniMessage.upsert({ @@ -344,13 +362,13 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { text, providerMessageId: env.providerMessageId, providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId), - rawJson: (env.payloadRaw ?? null) as Prisma.InputJsonValue, + rawJson: rawEnvelope, occurredAt, }, update: { text, providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId), - rawJson: (env.payloadRaw ?? null) as Prisma.InputJsonValue, + rawJson: rawEnvelope, occurredAt, }, }); @@ -366,7 +384,7 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { text, providerMessageId: null, providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId), - rawJson: (env.payloadRaw ?? null) as Prisma.InputJsonValue, + rawJson: rawEnvelope, occurredAt, }, }); diff --git a/omni_outbound/prisma/schema.prisma b/omni_outbound/prisma/schema.prisma index e344556..da6d02f 100644 --- a/omni_outbound/prisma/schema.prisma +++ b/omni_outbound/prisma/schema.prisma @@ -68,8 +68,8 @@ model Team { contacts Contact[] calendarEvents CalendarEvent[] deals Deal[] - conversations ChatConversation[] - chatMessages ChatMessage[] + aiConversations AiConversation[] + aiMessages AiMessage[] omniThreads OmniThread[] omniMessages OmniMessage[] @@ -90,9 +90,9 @@ model User { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - memberships TeamMember[] - conversations ChatConversation[] @relation("ConversationCreator") - chatMessages ChatMessage[] @relation("ChatAuthor") + memberships TeamMember[] + aiConversations AiConversation[] @relation("ConversationCreator") + aiMessages AiMessage[] @relation("ChatAuthor") } model TeamMember { @@ -305,7 +305,7 @@ model DealStep { @@index([status, dueAt]) } -model ChatConversation { +model AiConversation { id String @id @default(cuid()) teamId String createdByUserId String @@ -313,15 +313,16 @@ model ChatConversation { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) - createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) - messages ChatMessage[] + team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) + createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade) + messages AiMessage[] @@index([teamId, updatedAt]) @@index([createdByUserId]) + @@map("ChatConversation") } -model ChatMessage { +model AiMessage { id String @id @default(cuid()) teamId String conversationId String @@ -331,13 +332,14 @@ model ChatMessage { planJson Json? createdAt DateTime @default(now()) - team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) - conversation ChatConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) - authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) + team Team @relation(fields: [teamId], references: [id], onDelete: Cascade) + conversation AiConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) + authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull) @@index([createdAt]) @@index([teamId, createdAt]) @@index([conversationId, createdAt]) + @@map("ChatMessage") } model FeedCard { diff --git a/omni_outbound/src/queues/outboundDelivery.ts b/omni_outbound/src/queues/outboundDelivery.ts index 229be49..8b7702a 100644 --- a/omni_outbound/src/queues/outboundDelivery.ts +++ b/omni_outbound/src/queues/outboundDelivery.ts @@ -62,6 +62,11 @@ function extractProviderMessageId(body: unknown): string | null { return String(candidate); } +function asObject(value: unknown): Record { + if (!value || typeof value !== "object" || Array.isArray(value)) return {}; + return value as Record; +} + export function outboundDeliveryQueue() { return new Queue(OUTBOUND_DELIVERY_QUEUE_NAME, { connection: redisConnectionFromEnv(), @@ -77,16 +82,26 @@ export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?: const q = outboundDeliveryQueue(); const payload = (input.payload ?? null) as Prisma.InputJsonValue; + const existing = await prisma.omniMessage.findUnique({ + where: { id: input.omniMessageId }, + select: { rawJson: true }, + }); + const raw = asObject(existing?.rawJson); + const rawQueue = asObject(raw.queue); + const rawDeliveryRequest = asObject(raw.deliveryRequest); await prisma.omniMessage.update({ where: { id: input.omniMessageId }, data: { status: "PENDING", rawJson: { + ...raw, queue: { + ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, enqueuedAt: new Date().toISOString(), }, deliveryRequest: { + ...rawDeliveryRequest, endpoint, method: input.method ?? "POST", channel: input.channel ?? null, @@ -149,18 +164,24 @@ export function startOutboundDeliveryWorker() { } const providerMessageId = extractProviderMessageId(responseBody); + const raw = asObject(msg.rawJson); + const rawQueue = asObject(raw.queue); + const rawDeliveryRequest = asObject(raw.deliveryRequest); await prisma.omniMessage.update({ where: { id: msg.id }, data: { status: "SENT", providerMessageId, rawJson: { + ...raw, queue: { + ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, completedAt: new Date().toISOString(), attemptsMade: job.attemptsMade + 1, }, deliveryRequest: { + ...rawDeliveryRequest, endpoint, method, channel: job.data.channel ?? null, @@ -180,17 +201,23 @@ export function startOutboundDeliveryWorker() { typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts; if (isLastAttempt) { + const raw = asObject(msg.rawJson); + const rawQueue = asObject(raw.queue); + const rawDeliveryRequest = asObject(raw.deliveryRequest); await prisma.omniMessage.update({ where: { id: msg.id }, data: { status: "FAILED", rawJson: { + ...raw, queue: { + ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, failedAt: new Date().toISOString(), attemptsMade: job.attemptsMade + 1, }, deliveryRequest: { + ...rawDeliveryRequest, endpoint, method, channel: job.data.channel ?? null,