diff --git a/omni_chat/src/worker.ts b/omni_chat/src/worker.ts index bf6f5ca..a12eb39 100644 --- a/omni_chat/src/worker.ts +++ b/omni_chat/src/worker.ts @@ -9,7 +9,7 @@ type OmniInboundEnvelopeV1 = { idempotencyKey: string; provider: string; channel: "TELEGRAM" | "WHATSAPP" | "INSTAGRAM" | "PHONE" | "EMAIL" | "INTERNAL"; - direction: "IN"; + direction: "IN" | "OUT"; providerEventId: string; providerMessageId: string | null; eventType: string; @@ -27,6 +27,7 @@ type OmniInboundEnvelopeV1 = { }; export const RECEIVER_FLOW_QUEUE_NAME = (process.env.RECEIVER_FLOW_QUEUE_NAME || "receiver.flow").trim(); +const TELEGRAM_PLACEHOLDER_PREFIX = "Telegram "; function redisConnectionFromEnv(): ConnectionOptions { const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim(); @@ -52,6 +53,83 @@ function parseOccurredAt(input: string | null | undefined) { return d; } +function asString(input: unknown) { + if (typeof input !== "string") return null; + const trimmed = input.trim(); + return trimmed || null; +} + +function safeDirection(input: unknown): "IN" | "OUT" { + return input === "OUT" ? "OUT" : "IN"; +} + +function isUniqueConstraintError(error: unknown) { + return error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002"; +} + +type ContactProfile = { + displayName: string; + avatarUrl: string | null; +}; + +function buildContactProfile( + normalized: OmniInboundEnvelopeV1["payloadNormalized"], + externalContactId: string, +): ContactProfile { + const firstName = + asString(normalized.contactFirstName) ?? + asString(normalized.fromFirstName) ?? + asString(normalized.chatFirstName); + const lastName = + asString(normalized.contactLastName) ?? + asString(normalized.fromLastName) ?? + asString(normalized.chatLastName); + const username = + asString(normalized.contactUsername) ?? + asString(normalized.fromUsername) ?? + asString(normalized.chatUsername); + const title = asString(normalized.contactTitle) ?? asString(normalized.chatTitle); + + const fullName = [firstName, lastName].filter(Boolean).join(" "); + const displayName = + fullName || + (username ? `@${username.replace(/^@/, "")}` : null) || + title || + `${TELEGRAM_PLACEHOLDER_PREFIX}${externalContactId}`; + + return { + displayName, + avatarUrl: asString(normalized.contactAvatarUrl), + }; +} + +async function maybeHydrateContact(contactId: string, profile: ContactProfile) { + const current = await prisma.contact.findUnique({ + where: { id: contactId }, + select: { name: true, avatarUrl: true }, + }); + if (!current) return; + + const updates: Prisma.ContactUpdateInput = {}; + const currentName = asString(current.name); + const nextName = asString(profile.displayName); + + if (nextName && (!currentName || currentName.startsWith(TELEGRAM_PLACEHOLDER_PREFIX)) && currentName !== nextName) { + updates.name = nextName; + } + + const currentAvatar = asString(current.avatarUrl); + if (profile.avatarUrl && !currentAvatar) { + updates.avatarUrl = profile.avatarUrl; + } + + if (Object.keys(updates).length === 0) return; + await prisma.contact.update({ + where: { id: contactId }, + data: updates, + }); +} + async function resolveTeamId(env: OmniInboundEnvelopeV1) { const n = env.payloadNormalized ?? ({} as OmniInboundEnvelopeV1["payloadNormalized"]); const bcId = String(n.businessConnectionId ?? "").trim(); @@ -85,7 +163,11 @@ async function resolveTeamId(env: OmniInboundEnvelopeV1) { return demo?.id ?? null; } -async function resolveContact(input: { teamId: string; externalContactId: string }) { +async function resolveContact(input: { + teamId: string; + externalContactId: string; + profile: ContactProfile; +}) { const existingIdentity = await prisma.omniContactIdentity.findFirst({ where: { teamId: input.teamId, @@ -95,28 +177,48 @@ async function resolveContact(input: { teamId: string; externalContactId: string select: { contactId: true }, }); if (existingIdentity?.contactId) { + await maybeHydrateContact(existingIdentity.contactId, input.profile); return existingIdentity.contactId; } const contact = await prisma.contact.create({ data: { teamId: input.teamId, - name: `Telegram ${input.externalContactId}`, - company: "", - country: "", - location: "", + name: input.profile.displayName, + avatarUrl: input.profile.avatarUrl, + company: null, + country: null, + location: null, }, select: { id: true }, }); - await prisma.omniContactIdentity.create({ - data: { - teamId: input.teamId, - contactId: contact.id, - channel: "TELEGRAM", - externalId: input.externalContactId, - }, - }); + try { + await prisma.omniContactIdentity.create({ + data: { + teamId: input.teamId, + contactId: contact.id, + channel: "TELEGRAM", + externalId: input.externalContactId, + }, + }); + } catch (error) { + if (!isUniqueConstraintError(error)) throw error; + + const concurrentIdentity = await prisma.omniContactIdentity.findFirst({ + where: { + teamId: input.teamId, + channel: "TELEGRAM", + externalId: input.externalContactId, + }, + select: { contactId: true }, + }); + if (!concurrentIdentity?.contactId) throw error; + + await prisma.contact.delete({ where: { id: contact.id } }).catch(() => undefined); + await maybeHydrateContact(concurrentIdentity.contactId, input.profile); + return concurrentIdentity.contactId; + } return contact.id; } @@ -126,6 +228,7 @@ async function upsertThread(input: { contactId: string; externalChatId: string; businessConnectionId: string | null; + title: string | null; }) { const existing = await prisma.omniThread.findFirst({ where: { @@ -134,32 +237,60 @@ async function upsertThread(input: { externalChatId: input.externalChatId, businessConnectionId: input.businessConnectionId, }, - select: { id: true }, + select: { id: true, title: true }, }); if (existing) { + const data: Prisma.OmniThreadUpdateInput = { + contactId: input.contactId, + }; + if (input.title && !existing.title) { + data.title = input.title; + } + await prisma.omniThread.update({ where: { id: existing.id }, - data: { contactId: input.contactId }, + data, }); return existing; } - return prisma.omniThread.create({ - data: { - teamId: input.teamId, - contactId: input.contactId, - channel: "TELEGRAM", - externalChatId: input.externalChatId, - businessConnectionId: input.businessConnectionId, - title: null, - }, - select: { id: true }, - }); + try { + return await prisma.omniThread.create({ + data: { + teamId: input.teamId, + contactId: input.contactId, + channel: "TELEGRAM", + externalChatId: input.externalChatId, + businessConnectionId: input.businessConnectionId, + title: input.title, + }, + select: { id: true }, + }); + } catch (error) { + if (!isUniqueConstraintError(error)) throw error; + + const concurrentThread = await prisma.omniThread.findFirst({ + where: { + teamId: input.teamId, + channel: "TELEGRAM", + externalChatId: input.externalChatId, + businessConnectionId: input.businessConnectionId, + }, + select: { id: true }, + }); + if (!concurrentThread) throw error; + + await prisma.omniThread.update({ + where: { id: concurrentThread.id }, + data: { contactId: input.contactId }, + }); + return concurrentThread; + } } async function ingestInbound(env: OmniInboundEnvelopeV1) { - if (env.channel !== "TELEGRAM" || env.direction !== "IN") return; + if (env.channel !== "TELEGRAM") return; const teamId = await resolveTeamId(env); if (!teamId) { @@ -179,13 +310,20 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { const businessConnectionId = String(n.businessConnectionId ?? "").trim() || null; const text = normalizeText(n.text); const occurredAt = parseOccurredAt(env.occurredAt); + const direction = safeDirection(env.direction); + const contactProfile = buildContactProfile(n, externalContactId); - const contactId = await resolveContact({ teamId, externalContactId }); + const contactId = await resolveContact({ + teamId, + externalContactId, + profile: contactProfile, + }); const thread = await upsertThread({ teamId, contactId, externalChatId, businessConnectionId, + title: asString(n.chatTitle), }); if (env.providerMessageId) { @@ -200,7 +338,7 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { teamId, contactId, threadId: thread.id, - direction: "IN", + direction, channel: "TELEGRAM", status: "DELIVERED", text, @@ -222,7 +360,7 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { teamId, contactId, threadId: thread.id, - direction: "IN", + direction, channel: "TELEGRAM", status: "DELIVERED", text, @@ -238,7 +376,7 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { data: { contactId, kind: "MESSAGE", - direction: "IN", + direction, channel: "TELEGRAM", content: text, occurredAt, diff --git a/omni_inbound/src/telegram.ts b/omni_inbound/src/telegram.ts index be8b8f3..c8dc3ee 100644 --- a/omni_inbound/src/telegram.ts +++ b/omni_inbound/src/telegram.ts @@ -45,6 +45,29 @@ function cropText(value: unknown) { return value.slice(0, MAX_TEXT_LENGTH); } +function normalizeString(value: unknown) { + if (typeof value !== "string") return null; + const normalized = value.trim(); + return normalized || null; +} + +function detectDirection(message: JsonObject, chat: JsonObject, from: JsonObject): "IN" | "OUT" { + if (typeof message.outgoing === "boolean") return message.outgoing ? "OUT" : "IN"; + if (typeof message.is_outgoing === "boolean") return message.is_outgoing ? "OUT" : "IN"; + if (typeof message.out === "boolean") return message.out ? "OUT" : "IN"; + + const chatType = normalizeString(chat.type); + if (chatType === "private" && from.is_bot === true) return "OUT"; + + const chatId = chat.id != null ? String(chat.id) : null; + const fromId = from.id != null ? String(from.id) : null; + if (chatType === "private" && chatId && fromId && chatId !== fromId) { + return "OUT"; + } + + return "IN"; +} + function requireString(value: unknown, fallback: string) { const v = String(value ?? "").trim(); return v || fallback; @@ -72,6 +95,9 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1 const chat = asObject(message.chat); const from = asObject(message.from); + const direction = detectDirection(message, chat, from); + const contactSource = direction === "OUT" && Object.keys(chat).length > 0 ? chat : from; + const fallbackContactSource = direction === "OUT" ? from : chat; const threadExternalId = chat.id != null @@ -80,7 +106,12 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1 ? String(businessConnection.user_chat_id) : null; - const contactExternalId = from.id != null ? String(from.id) : null; + const contactExternalId = + contactSource.id != null + ? String(contactSource.id) + : fallbackContactSource.id != null + ? String(fallbackContactSource.id) + : null; const text = cropText(message.text) ?? cropText(message.caption); @@ -105,7 +136,7 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1 idempotencyKey, provider: "telegram_business", channel: "TELEGRAM", - direction: "IN", + direction, providerEventId, providerMessageId, eventType, @@ -119,9 +150,18 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1 businessConnectionId, updateId: updateId != null ? String(updateId) : null, chatTitle: typeof chat.title === "string" ? chat.title : null, + chatUsername: normalizeString(chat.username), + chatFirstName: normalizeString(chat.first_name), + chatLastName: normalizeString(chat.last_name), + contactUsername: normalizeString(contactSource.username), + contactFirstName: normalizeString(contactSource.first_name), + contactLastName: normalizeString(contactSource.last_name), + contactTitle: normalizeString(contactSource.title), + contactAvatarUrl: normalizeString(contactSource.photo_url), fromUsername: typeof from.username === "string" ? from.username : null, fromFirstName: typeof from.first_name === "string" ? from.first_name : null, fromLastName: typeof from.last_name === "string" ? from.last_name : null, + fromIsBot: from.is_bot === true, }, }; } diff --git a/omni_inbound/src/types.ts b/omni_inbound/src/types.ts index b343d3f..28f10b5 100644 --- a/omni_inbound/src/types.ts +++ b/omni_inbound/src/types.ts @@ -5,7 +5,7 @@ export type OmniInboundEnvelopeV1 = { idempotencyKey: string; provider: string; channel: OmniInboundChannel; - direction: "IN"; + direction: "IN" | "OUT"; providerEventId: string; providerMessageId: string | null; eventType: string;