Files
clientsflow/omni_chat/src/worker.ts
Ruslan Bakiev 0f87586e81 fix: OUT messages no longer create unread status + handle Telegram read receipts
Only inbound (IN) messages determine hasUnread in getContacts(). Telegram
read_business_message events are now parsed and processed to auto-mark
contacts as read for the entire team.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 14:53:55 +07:00

653 lines
20 KiB
TypeScript

import { Queue, Worker, type ConnectionOptions, type Job } from "bullmq";
import { Prisma } from "@prisma/client";
import { prisma } from "./utils/prisma";
type JsonObject = Record<string, unknown>;
type OmniInboundEnvelopeV1 = {
version: 1;
idempotencyKey: string;
provider: string;
channel: "TELEGRAM" | "WHATSAPP" | "INSTAGRAM" | "PHONE" | "EMAIL" | "INTERNAL";
direction: "IN" | "OUT";
providerEventId: string;
providerMessageId: string | null;
eventType: string;
occurredAt: string;
receivedAt: string;
payloadRaw: unknown;
payloadNormalized: {
threadExternalId: string | null;
contactExternalId: string | null;
text: string | null;
businessConnectionId: string | null;
updateId?: string | null;
[key: string]: unknown;
};
};
export const RECEIVER_FLOW_QUEUE_NAME = (process.env.RECEIVER_FLOW_QUEUE_NAME || "receiver.flow").trim();
const TELEGRAM_PLACEHOLDER_PREFIX = "Telegram ";
const TELEGRAM_AUDIO_FILE_MARKER = "tg-file:";
const TELEGRAM_WAVE_BINS = 96;
function redisConnectionFromEnv(): ConnectionOptions {
const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim();
const parsed = new URL(raw);
return {
host: parsed.hostname,
port: parsed.port ? Number(parsed.port) : 6379,
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
db: parsed.pathname && parsed.pathname !== "/" ? Number(parsed.pathname.slice(1)) : undefined,
maxRetriesPerRequest: null,
};
}
function normalizeText(input: unknown) {
const t = String(input ?? "").trim();
return t || "[no text]";
}
type TelegramInboundMedia = {
kind: "voice" | "audio" | "video_note" | null;
fileId: string | null;
durationSec: number | null;
label: string | null;
};
function parseTelegramInboundMedia(normalized: OmniInboundEnvelopeV1["payloadNormalized"]): TelegramInboundMedia {
const kindRaw = String(normalized.mediaKind ?? "").trim().toLowerCase();
const kind: TelegramInboundMedia["kind"] =
kindRaw === "voice" || kindRaw === "audio" || kindRaw === "video_note"
? kindRaw
: null;
const fileId = asString(normalized.mediaFileId);
const durationRaw = normalized.mediaDurationSec;
const durationParsed =
typeof durationRaw === "number"
? durationRaw
: typeof durationRaw === "string"
? Number(durationRaw)
: Number.NaN;
const durationSec =
Number.isFinite(durationParsed) && durationParsed > 0
? Math.max(1, Math.round(durationParsed))
: null;
const label = asString(normalized.mediaTitle);
return { kind, fileId, durationSec, label };
}
function fallbackTextFromMedia(media: TelegramInboundMedia) {
if (!media.kind) return null;
if (media.kind === "voice") return "[voice message]";
if (media.kind === "video_note") return "[video note]";
if (media.label) return `[audio] ${media.label}`;
return "[audio]";
}
function buildFallbackWaveform(seedText: string, bins = TELEGRAM_WAVE_BINS) {
let seed = 0;
for (let i = 0; i < seedText.length; i += 1) {
seed = (seed * 33 + seedText.charCodeAt(i)) >>> 0;
}
const random = () => {
seed = (seed * 1664525 + 1013904223) >>> 0;
return seed / 0xffffffff;
};
const out: number[] = [];
let smooth = 0;
for (let i = 0; i < bins; i += 1) {
const t = i / Math.max(1, bins - 1);
const burst = Math.max(0, Math.sin(t * Math.PI * (2 + (seedText.length % 5))));
const noise = (random() * 2 - 1) * 0.6;
smooth = smooth * 0.72 + noise * 0.28;
const value = Math.max(0.06, Math.min(1, 0.12 + Math.abs(smooth) * 0.42 + burst * 0.4));
out.push(Number(value.toFixed(4)));
}
return out;
}
function buildWaveformFromBytes(bytes: Uint8Array, bins = TELEGRAM_WAVE_BINS) {
if (!bytes.length) return [];
const bucketSize = Math.max(1, Math.ceil(bytes.length / bins));
const raw = new Array<number>(bins).fill(0);
for (let i = 0; i < bins; i += 1) {
const start = i * bucketSize;
const end = Math.min(bytes.length, start + bucketSize);
if (start >= end) continue;
let energy = 0;
for (let j = start; j < end; j += 1) {
energy += Math.abs(bytes[j] - 128) / 128;
}
raw[i] = energy / (end - start);
}
const smooth: number[] = [];
let prev = 0;
for (const value of raw) {
prev = prev * 0.78 + value * 0.22;
smooth.push(prev);
}
const maxValue = Math.max(...smooth, 0);
if (maxValue <= 0) return [];
return smooth.map((value) => {
const normalized = value / maxValue;
const mapped = Math.max(0.06, Math.min(1, normalized * 0.9 + 0.06));
return Number(mapped.toFixed(4));
});
}
async function fetchTelegramFileBytes(fileId: string) {
const token = String(process.env.TELEGRAM_BOT_TOKEN ?? "").trim();
if (!token) return null;
const base = String(process.env.TELEGRAM_API_BASE ?? "https://api.telegram.org").replace(/\/+$/, "");
const metaRes = await fetch(`${base}/bot${token}/getFile`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ file_id: fileId }),
});
const metaJson = (await metaRes.json().catch(() => null)) as
| { ok?: boolean; result?: { file_path?: string } }
| null;
const filePath = String(metaJson?.result?.file_path ?? "").trim();
if (!metaRes.ok || !metaJson?.ok || !filePath) return null;
const fileRes = await fetch(`${base}/file/bot${token}/${filePath}`);
if (!fileRes.ok) return null;
return new Uint8Array(await fileRes.arrayBuffer());
}
async function resolveInboundWaveform(media: TelegramInboundMedia, text: string) {
const fallback = buildFallbackWaveform(`${media.fileId ?? "none"}:${media.durationSec ?? "0"}:${text}`);
const fileId = media.fileId;
if (!fileId) return fallback;
try {
const bytes = await fetchTelegramFileBytes(fileId);
if (!bytes?.length) return fallback;
const fromFile = buildWaveformFromBytes(bytes);
return fromFile.length ? fromFile : fallback;
} catch {
return fallback;
}
}
function parseOccurredAt(input: string | null | undefined) {
const d = new Date(String(input ?? ""));
if (Number.isNaN(d.getTime())) return new Date();
return d;
}
function asString(input: unknown) {
if (typeof input !== "string") return null;
const trimmed = input.trim();
return trimmed || null;
}
function safeDirection(input: unknown): "IN" | "OUT" {
return input === "OUT" ? "OUT" : "IN";
}
function isUniqueConstraintError(error: unknown) {
return error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002";
}
type ContactProfile = {
displayName: string;
avatarUrl: string | null;
};
function buildContactProfile(
normalized: OmniInboundEnvelopeV1["payloadNormalized"],
externalContactId: string,
): ContactProfile {
// Use only normalized contact-* fields (counterparty), avoid sender/chat fallbacks
// to prevent accidental renames to the business owner name on OUT events.
const firstName = asString(normalized.contactFirstName);
const lastName = asString(normalized.contactLastName);
const username = asString(normalized.contactUsername);
const title = asString(normalized.contactTitle);
const fullName = [firstName, lastName].filter(Boolean).join(" ");
const displayName =
fullName ||
(username ? `@${username.replace(/^@/, "")}` : null) ||
title ||
`${TELEGRAM_PLACEHOLDER_PREFIX}${externalContactId}`;
return {
displayName,
avatarUrl: asString(normalized.contactAvatarUrl),
};
}
async function maybeHydrateContact(contactId: string, profile: ContactProfile) {
const current = await prisma.contact.findUnique({
where: { id: contactId },
select: { name: true, avatarUrl: true },
});
if (!current) return;
const updates: Prisma.ContactUpdateInput = {};
const currentName = asString(current.name);
const nextName = asString(profile.displayName);
if (nextName && (!currentName || currentName.startsWith(TELEGRAM_PLACEHOLDER_PREFIX)) && currentName !== nextName) {
updates.name = nextName;
}
const currentAvatar = asString(current.avatarUrl);
if (profile.avatarUrl && !currentAvatar) {
updates.avatarUrl = profile.avatarUrl;
}
if (Object.keys(updates).length === 0) return;
await prisma.contact.update({
where: { id: contactId },
data: updates,
});
}
async function resolveTeamId(env: OmniInboundEnvelopeV1) {
const n = env.payloadNormalized ?? ({} as OmniInboundEnvelopeV1["payloadNormalized"]);
const bcId = String(n.businessConnectionId ?? "").trim();
if (bcId) {
const linked = await prisma.telegramBusinessConnection.findFirst({
where: { businessConnectionId: bcId },
orderBy: { updatedAt: "desc" },
select: { teamId: true },
});
if (linked?.teamId) return linked.teamId;
}
const externalContactId = String(n.contactExternalId ?? n.threadExternalId ?? "").trim();
if (externalContactId) {
const pseudo = `link:${externalContactId}`;
const linked = await prisma.telegramBusinessConnection.findFirst({
where: { businessConnectionId: pseudo },
orderBy: { updatedAt: "desc" },
select: { teamId: true },
});
if (linked?.teamId) return linked.teamId;
}
const fallbackTeamId = String(process.env.DEFAULT_TEAM_ID || "").trim();
if (fallbackTeamId) return fallbackTeamId;
const demo = await prisma.team.findFirst({
where: { id: "demo-team" },
select: { id: true },
});
return demo?.id ?? null;
}
async function resolveContact(input: {
teamId: string;
externalContactId: string;
profile: ContactProfile;
}) {
const existingIdentity = await prisma.omniContactIdentity.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
select: { contactId: true },
});
if (existingIdentity?.contactId) {
await maybeHydrateContact(existingIdentity.contactId, input.profile);
return existingIdentity.contactId;
}
const contact = await prisma.contact.create({
data: {
teamId: input.teamId,
name: input.profile.displayName,
avatarUrl: input.profile.avatarUrl,
},
select: { id: true },
});
try {
await prisma.omniContactIdentity.create({
data: {
teamId: input.teamId,
contactId: contact.id,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
});
} catch (error) {
if (!isUniqueConstraintError(error)) throw error;
const concurrentIdentity = await prisma.omniContactIdentity.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
select: { contactId: true },
});
if (!concurrentIdentity?.contactId) throw error;
await prisma.contact.delete({ where: { id: contact.id } }).catch(() => undefined);
await maybeHydrateContact(concurrentIdentity.contactId, input.profile);
return concurrentIdentity.contactId;
}
return contact.id;
}
async function upsertThread(input: {
teamId: string;
contactId: string;
externalChatId: string;
businessConnectionId: string | null;
title: string | null;
}) {
const existing = await prisma.omniThread.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
},
select: { id: true, title: true },
});
if (existing) {
const data: Prisma.OmniThreadUpdateInput = {
contact: { connect: { id: input.contactId } },
};
if (input.title && !existing.title) {
data.title = input.title;
}
await prisma.omniThread.update({
where: { id: existing.id },
data,
});
return existing;
}
try {
return await prisma.omniThread.create({
data: {
teamId: input.teamId,
contactId: input.contactId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
title: input.title,
},
select: { id: true },
});
} catch (error) {
if (!isUniqueConstraintError(error)) throw error;
const concurrentThread = await prisma.omniThread.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
},
select: { id: true },
});
if (!concurrentThread) throw error;
await prisma.omniThread.update({
where: { id: concurrentThread.id },
data: { contact: { connect: { id: input.contactId } } },
});
return concurrentThread;
}
}
async function upsertContactInbox(input: {
teamId: string;
contactId: string;
channel: "TELEGRAM";
sourceExternalId: string;
title: string | null;
}) {
return prisma.contactInbox.upsert({
where: {
teamId_channel_sourceExternalId: {
teamId: input.teamId,
channel: input.channel,
sourceExternalId: input.sourceExternalId,
},
},
create: {
teamId: input.teamId,
contactId: input.contactId,
channel: input.channel,
sourceExternalId: input.sourceExternalId,
title: input.title,
},
update: {
contactId: input.contactId,
...(input.title ? { title: input.title } : {}),
},
select: { id: true },
});
}
async function handleReadBusinessMessage(env: OmniInboundEnvelopeV1) {
const teamId = await resolveTeamId(env);
if (!teamId) return;
const n = env.payloadNormalized ?? ({} as OmniInboundEnvelopeV1["payloadNormalized"]);
const externalChatId = String(n.threadExternalId ?? n.contactExternalId ?? "").trim();
if (!externalChatId) return;
const thread = await prisma.omniThread.findFirst({
where: { teamId, channel: "TELEGRAM", externalChatId },
select: { contactId: true },
});
if (!thread) return;
const teamUsers = await prisma.teamMember.findMany({
where: { teamId },
select: { userId: true },
});
const now = new Date();
// ContactThreadRead is not in omni_chat's Prisma schema, use raw upsert
await Promise.all(
teamUsers.map((u) =>
prisma.$executeRaw`
INSERT INTO "ContactThreadRead" ("id", "teamId", "userId", "contactId", "readAt")
VALUES (gen_random_uuid(), ${teamId}, ${u.userId}, ${thread.contactId}, ${now})
ON CONFLICT ("userId", "contactId") DO UPDATE SET "readAt" = ${now}
`,
),
);
console.log(`[omni_chat] read_business_message: marked contact ${thread.contactId} as read for ${teamUsers.length} users`);
}
async function ingestInbound(env: OmniInboundEnvelopeV1) {
if (env.channel !== "TELEGRAM") return;
if (env.eventType === "read_business_message") {
await handleReadBusinessMessage(env);
return;
}
const teamId = await resolveTeamId(env);
if (!teamId) {
console.warn("[omni_chat] skip inbound: team not resolved", env.providerEventId);
return;
}
const n = env.payloadNormalized ?? ({} as OmniInboundEnvelopeV1["payloadNormalized"]);
const externalContactId = String(n.contactExternalId ?? n.threadExternalId ?? "").trim();
const externalChatId = String(n.threadExternalId ?? n.contactExternalId ?? "").trim();
if (!externalContactId || !externalChatId) {
console.warn("[omni_chat] skip inbound: missing contact/chat ids", env.providerEventId);
return;
}
const businessConnectionId = String(n.businessConnectionId ?? "").trim() || null;
const media = parseTelegramInboundMedia(n);
const text = normalizeText(asString(n.text) ?? fallbackTextFromMedia(media));
const isAudioLike = Boolean(media.fileId) && (media.kind === "voice" || media.kind === "audio" || media.kind === "video_note");
const contactMessageKind: "MESSAGE" | "CALL" = isAudioLike ? "CALL" : "MESSAGE";
const contactMessageAudioUrl = isAudioLike ? `${TELEGRAM_AUDIO_FILE_MARKER}${media.fileId}` : null;
const waveformPeaks = isAudioLike ? await resolveInboundWaveform(media, text) : null;
const occurredAt = parseOccurredAt(env.occurredAt);
const direction = safeDirection(env.direction);
const contactProfile = buildContactProfile(n, externalContactId);
const contactId = await resolveContact({
teamId,
externalContactId,
profile: contactProfile,
});
const thread = await upsertThread({
teamId,
contactId,
externalChatId,
businessConnectionId,
title: asString(n.chatTitle),
});
const inbox = await upsertContactInbox({
teamId,
contactId,
channel: "TELEGRAM",
sourceExternalId: externalChatId,
title: asString(n.chatTitle),
});
const rawEnvelope = {
version: env.version,
source: "omni_chat.receiver",
provider: env.provider,
channel: env.channel,
direction,
providerEventId: env.providerEventId,
receivedAt: env.receivedAt,
occurredAt: occurredAt.toISOString(),
normalized: {
text,
threadExternalId: externalChatId,
contactExternalId: externalContactId,
businessConnectionId,
mediaKind: media.kind,
mediaFileId: media.fileId,
mediaDurationSec: media.durationSec,
mediaLabel: media.label,
},
payloadNormalized: n,
payloadRaw: (env.payloadRaw ?? null) as Prisma.InputJsonValue,
} as Prisma.InputJsonValue;
if (env.providerMessageId) {
await prisma.omniMessage.upsert({
where: {
threadId_providerMessageId: {
threadId: thread.id,
providerMessageId: env.providerMessageId,
},
},
create: {
teamId,
contactId,
threadId: thread.id,
direction,
channel: "TELEGRAM",
status: "DELIVERED",
text,
providerMessageId: env.providerMessageId,
providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId),
rawJson: rawEnvelope,
occurredAt,
},
update: {
text,
providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId),
rawJson: rawEnvelope,
occurredAt,
},
});
} else {
await prisma.omniMessage.create({
data: {
teamId,
contactId,
threadId: thread.id,
direction,
channel: "TELEGRAM",
status: "DELIVERED",
text,
providerMessageId: null,
providerUpdateId: String((n.updateId as string | null | undefined) ?? env.providerEventId),
rawJson: rawEnvelope,
occurredAt,
},
});
}
await prisma.contactMessage.create({
data: {
contactId,
contactInboxId: inbox.id,
kind: contactMessageKind,
direction,
channel: "TELEGRAM",
content: text,
audioUrl: contactMessageAudioUrl,
durationSec: media.durationSec,
...(waveformPeaks ? { waveformJson: waveformPeaks as Prisma.InputJsonValue } : {}),
occurredAt,
},
});
}
let workerInstance: Worker<OmniInboundEnvelopeV1, unknown, "ingest"> | null = null;
export function startReceiverWorker() {
if (workerInstance) return workerInstance;
const worker = new Worker<OmniInboundEnvelopeV1, unknown, "ingest">(
RECEIVER_FLOW_QUEUE_NAME,
async (job) => {
await ingestInbound(job.data);
},
{
connection: redisConnectionFromEnv(),
concurrency: Number(process.env.OMNI_CHAT_WORKER_CONCURRENCY || 4),
},
);
worker.on("failed", (job: Job<OmniInboundEnvelopeV1, unknown, "ingest"> | undefined, err: Error) => {
console.error(`[omni_chat] receiver job failed id=${job?.id || "unknown"}: ${err?.message || err}`);
});
workerInstance = worker;
return worker;
}
export async function closeReceiverWorker() {
if (!workerInstance) return;
await workerInstance.close();
workerInstance = null;
}
export function receiverQueue() {
return new Queue<OmniInboundEnvelopeV1, unknown, "ingest">(RECEIVER_FLOW_QUEUE_NAME, {
connection: redisConnectionFromEnv(),
});
}