Fix Telegram direction mapping and contact hydration

This commit is contained in:
Ruslan Bakiev
2026-02-23 07:46:06 +07:00
parent 38fcb1bfcc
commit 1aad1d009c
3 changed files with 213 additions and 35 deletions

View File

@@ -9,7 +9,7 @@ type OmniInboundEnvelopeV1 = {
idempotencyKey: string;
provider: string;
channel: "TELEGRAM" | "WHATSAPP" | "INSTAGRAM" | "PHONE" | "EMAIL" | "INTERNAL";
direction: "IN";
direction: "IN" | "OUT";
providerEventId: string;
providerMessageId: string | null;
eventType: string;
@@ -27,6 +27,7 @@ type OmniInboundEnvelopeV1 = {
};
export const RECEIVER_FLOW_QUEUE_NAME = (process.env.RECEIVER_FLOW_QUEUE_NAME || "receiver.flow").trim();
const TELEGRAM_PLACEHOLDER_PREFIX = "Telegram ";
function redisConnectionFromEnv(): ConnectionOptions {
const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim();
@@ -52,6 +53,83 @@ function parseOccurredAt(input: string | null | undefined) {
return d;
}
function asString(input: unknown) {
if (typeof input !== "string") return null;
const trimmed = input.trim();
return trimmed || null;
}
function safeDirection(input: unknown): "IN" | "OUT" {
return input === "OUT" ? "OUT" : "IN";
}
function isUniqueConstraintError(error: unknown) {
return error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002";
}
type ContactProfile = {
displayName: string;
avatarUrl: string | null;
};
function buildContactProfile(
normalized: OmniInboundEnvelopeV1["payloadNormalized"],
externalContactId: string,
): ContactProfile {
const firstName =
asString(normalized.contactFirstName) ??
asString(normalized.fromFirstName) ??
asString(normalized.chatFirstName);
const lastName =
asString(normalized.contactLastName) ??
asString(normalized.fromLastName) ??
asString(normalized.chatLastName);
const username =
asString(normalized.contactUsername) ??
asString(normalized.fromUsername) ??
asString(normalized.chatUsername);
const title = asString(normalized.contactTitle) ?? asString(normalized.chatTitle);
const fullName = [firstName, lastName].filter(Boolean).join(" ");
const displayName =
fullName ||
(username ? `@${username.replace(/^@/, "")}` : null) ||
title ||
`${TELEGRAM_PLACEHOLDER_PREFIX}${externalContactId}`;
return {
displayName,
avatarUrl: asString(normalized.contactAvatarUrl),
};
}
async function maybeHydrateContact(contactId: string, profile: ContactProfile) {
const current = await prisma.contact.findUnique({
where: { id: contactId },
select: { name: true, avatarUrl: true },
});
if (!current) return;
const updates: Prisma.ContactUpdateInput = {};
const currentName = asString(current.name);
const nextName = asString(profile.displayName);
if (nextName && (!currentName || currentName.startsWith(TELEGRAM_PLACEHOLDER_PREFIX)) && currentName !== nextName) {
updates.name = nextName;
}
const currentAvatar = asString(current.avatarUrl);
if (profile.avatarUrl && !currentAvatar) {
updates.avatarUrl = profile.avatarUrl;
}
if (Object.keys(updates).length === 0) return;
await prisma.contact.update({
where: { id: contactId },
data: updates,
});
}
async function resolveTeamId(env: OmniInboundEnvelopeV1) {
const n = env.payloadNormalized ?? ({} as OmniInboundEnvelopeV1["payloadNormalized"]);
const bcId = String(n.businessConnectionId ?? "").trim();
@@ -85,7 +163,11 @@ async function resolveTeamId(env: OmniInboundEnvelopeV1) {
return demo?.id ?? null;
}
async function resolveContact(input: { teamId: string; externalContactId: string }) {
async function resolveContact(input: {
teamId: string;
externalContactId: string;
profile: ContactProfile;
}) {
const existingIdentity = await prisma.omniContactIdentity.findFirst({
where: {
teamId: input.teamId,
@@ -95,28 +177,48 @@ async function resolveContact(input: { teamId: string; externalContactId: string
select: { contactId: true },
});
if (existingIdentity?.contactId) {
await maybeHydrateContact(existingIdentity.contactId, input.profile);
return existingIdentity.contactId;
}
const contact = await prisma.contact.create({
data: {
teamId: input.teamId,
name: `Telegram ${input.externalContactId}`,
company: "",
country: "",
location: "",
name: input.profile.displayName,
avatarUrl: input.profile.avatarUrl,
company: null,
country: null,
location: null,
},
select: { id: true },
});
await prisma.omniContactIdentity.create({
data: {
teamId: input.teamId,
contactId: contact.id,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
});
try {
await prisma.omniContactIdentity.create({
data: {
teamId: input.teamId,
contactId: contact.id,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
});
} catch (error) {
if (!isUniqueConstraintError(error)) throw error;
const concurrentIdentity = await prisma.omniContactIdentity.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
select: { contactId: true },
});
if (!concurrentIdentity?.contactId) throw error;
await prisma.contact.delete({ where: { id: contact.id } }).catch(() => undefined);
await maybeHydrateContact(concurrentIdentity.contactId, input.profile);
return concurrentIdentity.contactId;
}
return contact.id;
}
@@ -126,6 +228,7 @@ async function upsertThread(input: {
contactId: string;
externalChatId: string;
businessConnectionId: string | null;
title: string | null;
}) {
const existing = await prisma.omniThread.findFirst({
where: {
@@ -134,32 +237,60 @@ async function upsertThread(input: {
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
},
select: { id: true },
select: { id: true, title: true },
});
if (existing) {
const data: Prisma.OmniThreadUpdateInput = {
contactId: input.contactId,
};
if (input.title && !existing.title) {
data.title = input.title;
}
await prisma.omniThread.update({
where: { id: existing.id },
data: { contactId: input.contactId },
data,
});
return existing;
}
return prisma.omniThread.create({
data: {
teamId: input.teamId,
contactId: input.contactId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
title: null,
},
select: { id: true },
});
try {
return await prisma.omniThread.create({
data: {
teamId: input.teamId,
contactId: input.contactId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
title: input.title,
},
select: { id: true },
});
} catch (error) {
if (!isUniqueConstraintError(error)) throw error;
const concurrentThread = await prisma.omniThread.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
},
select: { id: true },
});
if (!concurrentThread) throw error;
await prisma.omniThread.update({
where: { id: concurrentThread.id },
data: { contactId: input.contactId },
});
return concurrentThread;
}
}
async function ingestInbound(env: OmniInboundEnvelopeV1) {
if (env.channel !== "TELEGRAM" || env.direction !== "IN") return;
if (env.channel !== "TELEGRAM") return;
const teamId = await resolveTeamId(env);
if (!teamId) {
@@ -179,13 +310,20 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
const businessConnectionId = String(n.businessConnectionId ?? "").trim() || null;
const text = normalizeText(n.text);
const occurredAt = parseOccurredAt(env.occurredAt);
const direction = safeDirection(env.direction);
const contactProfile = buildContactProfile(n, externalContactId);
const contactId = await resolveContact({ teamId, externalContactId });
const contactId = await resolveContact({
teamId,
externalContactId,
profile: contactProfile,
});
const thread = await upsertThread({
teamId,
contactId,
externalChatId,
businessConnectionId,
title: asString(n.chatTitle),
});
if (env.providerMessageId) {
@@ -200,7 +338,7 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
teamId,
contactId,
threadId: thread.id,
direction: "IN",
direction,
channel: "TELEGRAM",
status: "DELIVERED",
text,
@@ -222,7 +360,7 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
teamId,
contactId,
threadId: thread.id,
direction: "IN",
direction,
channel: "TELEGRAM",
status: "DELIVERED",
text,
@@ -238,7 +376,7 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
data: {
contactId,
kind: "MESSAGE",
direction: "IN",
direction,
channel: "TELEGRAM",
content: text,
occurredAt,

View File

@@ -45,6 +45,29 @@ function cropText(value: unknown) {
return value.slice(0, MAX_TEXT_LENGTH);
}
function normalizeString(value: unknown) {
if (typeof value !== "string") return null;
const normalized = value.trim();
return normalized || null;
}
function detectDirection(message: JsonObject, chat: JsonObject, from: JsonObject): "IN" | "OUT" {
if (typeof message.outgoing === "boolean") return message.outgoing ? "OUT" : "IN";
if (typeof message.is_outgoing === "boolean") return message.is_outgoing ? "OUT" : "IN";
if (typeof message.out === "boolean") return message.out ? "OUT" : "IN";
const chatType = normalizeString(chat.type);
if (chatType === "private" && from.is_bot === true) return "OUT";
const chatId = chat.id != null ? String(chat.id) : null;
const fromId = from.id != null ? String(from.id) : null;
if (chatType === "private" && chatId && fromId && chatId !== fromId) {
return "OUT";
}
return "IN";
}
function requireString(value: unknown, fallback: string) {
const v = String(value ?? "").trim();
return v || fallback;
@@ -72,6 +95,9 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1
const chat = asObject(message.chat);
const from = asObject(message.from);
const direction = detectDirection(message, chat, from);
const contactSource = direction === "OUT" && Object.keys(chat).length > 0 ? chat : from;
const fallbackContactSource = direction === "OUT" ? from : chat;
const threadExternalId =
chat.id != null
@@ -80,7 +106,12 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1
? String(businessConnection.user_chat_id)
: null;
const contactExternalId = from.id != null ? String(from.id) : null;
const contactExternalId =
contactSource.id != null
? String(contactSource.id)
: fallbackContactSource.id != null
? String(fallbackContactSource.id)
: null;
const text = cropText(message.text) ?? cropText(message.caption);
@@ -105,7 +136,7 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1
idempotencyKey,
provider: "telegram_business",
channel: "TELEGRAM",
direction: "IN",
direction,
providerEventId,
providerMessageId,
eventType,
@@ -119,9 +150,18 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1
businessConnectionId,
updateId: updateId != null ? String(updateId) : null,
chatTitle: typeof chat.title === "string" ? chat.title : null,
chatUsername: normalizeString(chat.username),
chatFirstName: normalizeString(chat.first_name),
chatLastName: normalizeString(chat.last_name),
contactUsername: normalizeString(contactSource.username),
contactFirstName: normalizeString(contactSource.first_name),
contactLastName: normalizeString(contactSource.last_name),
contactTitle: normalizeString(contactSource.title),
contactAvatarUrl: normalizeString(contactSource.photo_url),
fromUsername: typeof from.username === "string" ? from.username : null,
fromFirstName: typeof from.first_name === "string" ? from.first_name : null,
fromLastName: typeof from.last_name === "string" ? from.last_name : null,
fromIsBot: from.is_bot === true,
},
};
}

View File

@@ -5,7 +5,7 @@ export type OmniInboundEnvelopeV1 = {
idempotencyKey: string;
provider: string;
channel: OmniInboundChannel;
direction: "IN";
direction: "IN" | "OUT";
providerEventId: string;
providerMessageId: string | null;
eventType: string;