From acd974766a216b16c2d388b86d8b1f2a0fba016e Mon Sep 17 00:00:00 2001 From: Ruslan Bakiev <572431+veikab@users.noreply.github.com> Date: Mon, 23 Feb 2026 12:21:53 +0700 Subject: [PATCH] feat(telegram): ingest and render inbound voice messages --- .../server/api/omni/telegram/media.get.ts | 76 +++++++++++++++++ frontend/server/graphql/schema.ts | 18 +++- omni_chat/src/worker.ts | 54 +++++++++++- omni_inbound/src/telegram.ts | 82 ++++++++++++++++++- 4 files changed, 225 insertions(+), 5 deletions(-) create mode 100644 frontend/server/api/omni/telegram/media.get.ts diff --git a/frontend/server/api/omni/telegram/media.get.ts b/frontend/server/api/omni/telegram/media.get.ts new file mode 100644 index 0000000..982ba38 --- /dev/null +++ b/frontend/server/api/omni/telegram/media.get.ts @@ -0,0 +1,76 @@ +import { getQuery, setHeader } from "h3"; +import { getAuthContext } from "../../../utils/auth"; +import { prisma } from "../../../utils/prisma"; +import { requireTelegramBotToken, telegramApiBase, telegramBotApi } from "../../../utils/telegram"; + +const TELEGRAM_AUDIO_FILE_MARKER = "tg-file:"; + +type TelegramFileMeta = { + file_id: string; + file_path?: string; +}; + +function parseTelegramFileId(audioUrl: string | null | undefined) { + const raw = String(audioUrl ?? "").trim(); + if (!raw.startsWith(TELEGRAM_AUDIO_FILE_MARKER)) return null; + const fileId = raw.slice(TELEGRAM_AUDIO_FILE_MARKER.length).trim(); + return fileId || null; +} + +export default defineEventHandler(async (event) => { + const auth = await getAuthContext(event); + const query = getQuery(event); + const messageId = String(query.messageId ?? "").trim(); + + if (!messageId) { + throw createError({ statusCode: 400, statusMessage: "messageId is required" }); + } + + const message = await prisma.contactMessage.findFirst({ + where: { + id: messageId, + channel: "TELEGRAM", + contact: { teamId: auth.teamId }, + }, + select: { + audioUrl: true, + }, + }); + + if (!message) { + throw createError({ statusCode: 404, statusMessage: "telegram message not found" }); + } + + const fileId = parseTelegramFileId(message.audioUrl); + if (!fileId) { + throw createError({ statusCode: 404, statusMessage: "telegram audio is missing" }); + } + + const meta = await telegramBotApi("getFile", { file_id: fileId }); + const filePath = String(meta?.file_path ?? "").trim(); + if (!filePath) { + throw createError({ statusCode: 502, statusMessage: "telegram file path is unavailable" }); + } + + const apiBase = telegramApiBase().replace(/\/+$/, ""); + const token = requireTelegramBotToken(); + const remote = await fetch(`${apiBase}/file/bot${token}/${filePath}`); + if (!remote.ok) { + throw createError({ statusCode: 502, statusMessage: `telegram file fetch failed (${remote.status})` }); + } + + const contentType = remote.headers.get("content-type") || "audio/ogg"; + const contentLength = remote.headers.get("content-length"); + const body = Buffer.from(await remote.arrayBuffer()); + + setHeader(event, "content-type", contentType); + setHeader(event, "cache-control", "private, max-age=60"); + if (contentLength) { + const parsedLength = Number(contentLength); + if (Number.isFinite(parsedLength) && parsedLength >= 0) { + setHeader(event, "content-length", parsedLength); + } + } + + return body; +}); diff --git a/frontend/server/graphql/schema.ts b/frontend/server/graphql/schema.ts index 6f291f8..a0d3d17 100644 --- a/frontend/server/graphql/schema.ts +++ b/frontend/server/graphql/schema.ts @@ -67,6 +67,7 @@ function extractOmniNormalizedText(rawJson: unknown, fallbackText = "") { type ClientTimelineContentType = "CALENDAR_EVENT" | "DOCUMENT" | "RECOMMENDATION"; const CONTACT_DOCUMENT_SCOPE_PREFIX = "contact:"; +const TELEGRAM_AUDIO_FILE_MARKER = "tg-file:"; function mapTimelineContentType(value: ClientTimelineContentType) { if (value === "CALENDAR_EVENT") return "calendar_event"; @@ -141,6 +142,19 @@ function visibleMessageWhere(hiddenInboxIds: string[]) { }; } +function resolveContactMessageAudioUrl(message: { + id: string; + channel: string; + audioUrl: string | null; +}) { + const raw = String(message.audioUrl ?? "").trim(); + if (!raw) return ""; + if (message.channel === "TELEGRAM" && raw.startsWith(TELEGRAM_AUDIO_FILE_MARKER)) { + return `/api/omni/telegram/media?messageId=${encodeURIComponent(message.id)}`; + } + return raw; +} + async function upsertContactInbox(input: { teamId: string; contactId: string; @@ -662,7 +676,7 @@ async function getDashboard(auth: AuthContext | null) { kind: m.kind === "CALL" ? "call" : "message", direction: m.direction === "IN" ? "in" : "out", text: m.content, - audioUrl: "", + audioUrl: resolveContactMessageAudioUrl(m), duration: m.durationSec ? new Date(m.durationSec * 1000).toISOString().slice(14, 19) : "", transcript: Array.isArray(m.transcriptJson) ? ((m.transcriptJson as any) as string[]) : [], deliveryStatus: resolveDeliveryStatus(m), @@ -918,7 +932,7 @@ async function getClientTimeline(auth: AuthContext | null, contactIdInput: strin kind: m.kind === "CALL" ? "call" : "message", direction: m.direction === "IN" ? "in" : "out", text: m.content, - audioUrl: "", + audioUrl: resolveContactMessageAudioUrl(m), duration: m.durationSec ? new Date(m.durationSec * 1000).toISOString().slice(14, 19) : "", transcript: Array.isArray(m.transcriptJson) ? ((m.transcriptJson as any) as string[]) : [], deliveryStatus: resolveDeliveryStatus(m), diff --git a/omni_chat/src/worker.ts b/omni_chat/src/worker.ts index c98d78e..033971e 100644 --- a/omni_chat/src/worker.ts +++ b/omni_chat/src/worker.ts @@ -28,6 +28,7 @@ type OmniInboundEnvelopeV1 = { export const RECEIVER_FLOW_QUEUE_NAME = (process.env.RECEIVER_FLOW_QUEUE_NAME || "receiver.flow").trim(); const TELEGRAM_PLACEHOLDER_PREFIX = "Telegram "; +const TELEGRAM_AUDIO_FILE_MARKER = "tg-file:"; function redisConnectionFromEnv(): ConnectionOptions { const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim(); @@ -47,6 +48,45 @@ function normalizeText(input: unknown) { return t || "[no text]"; } +type TelegramInboundMedia = { + kind: "voice" | "audio" | "video_note" | null; + fileId: string | null; + durationSec: number | null; + label: string | null; +}; + +function parseTelegramInboundMedia(normalized: OmniInboundEnvelopeV1["payloadNormalized"]): TelegramInboundMedia { + const kindRaw = String(normalized.mediaKind ?? "").trim().toLowerCase(); + const kind: TelegramInboundMedia["kind"] = + kindRaw === "voice" || kindRaw === "audio" || kindRaw === "video_note" + ? kindRaw + : null; + + const fileId = asString(normalized.mediaFileId); + const durationRaw = normalized.mediaDurationSec; + const durationParsed = + typeof durationRaw === "number" + ? durationRaw + : typeof durationRaw === "string" + ? Number(durationRaw) + : Number.NaN; + const durationSec = + Number.isFinite(durationParsed) && durationParsed > 0 + ? Math.max(1, Math.round(durationParsed)) + : null; + + const label = asString(normalized.mediaTitle); + return { kind, fileId, durationSec, label }; +} + +function fallbackTextFromMedia(media: TelegramInboundMedia) { + if (!media.kind) return null; + if (media.kind === "voice") return "[voice message]"; + if (media.kind === "video_note") return "[video note]"; + if (media.label) return `[audio] ${media.label}`; + return "[audio]"; +} + function parseOccurredAt(input: string | null | undefined) { const d = new Date(String(input ?? "")); if (Number.isNaN(d.getTime())) return new Date(); @@ -338,7 +378,11 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { } const businessConnectionId = String(n.businessConnectionId ?? "").trim() || null; - const text = normalizeText(n.text); + const media = parseTelegramInboundMedia(n); + const text = normalizeText(asString(n.text) ?? fallbackTextFromMedia(media)); + const isAudioLike = Boolean(media.fileId) && (media.kind === "voice" || media.kind === "audio" || media.kind === "video_note"); + const contactMessageKind: "MESSAGE" | "CALL" = isAudioLike ? "CALL" : "MESSAGE"; + const contactMessageAudioUrl = isAudioLike ? `${TELEGRAM_AUDIO_FILE_MARKER}${media.fileId}` : null; const occurredAt = parseOccurredAt(env.occurredAt); const direction = safeDirection(env.direction); const contactProfile = buildContactProfile(n, externalContactId); @@ -376,6 +420,10 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { threadExternalId: externalChatId, contactExternalId: externalContactId, businessConnectionId, + mediaKind: media.kind, + mediaFileId: media.fileId, + mediaDurationSec: media.durationSec, + mediaLabel: media.label, }, payloadNormalized: n, payloadRaw: (env.payloadRaw ?? null) as Prisma.InputJsonValue, @@ -431,10 +479,12 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) { data: { contactId, contactInboxId: inbox.id, - kind: "MESSAGE", + kind: contactMessageKind, direction, channel: "TELEGRAM", content: text, + audioUrl: contactMessageAudioUrl, + durationSec: media.durationSec, occurredAt, }, }); diff --git a/omni_inbound/src/telegram.ts b/omni_inbound/src/telegram.ts index c8dc3ee..834c8b1 100644 --- a/omni_inbound/src/telegram.ts +++ b/omni_inbound/src/telegram.ts @@ -51,6 +51,69 @@ function normalizeString(value: unknown) { return normalized || null; } +function normalizeNumber(value: unknown) { + if (typeof value === "number" && Number.isFinite(value)) return value; + if (typeof value === "string") { + const parsed = Number(value); + if (Number.isFinite(parsed)) return parsed; + } + return null; +} + +type TelegramMediaInfo = { + kind: "voice" | "audio" | "video_note" | null; + fileId: string | null; + durationSec: number | null; + mimeType: string | null; + title: string | null; +}; + +function pickTelegramMedia(message: JsonObject): TelegramMediaInfo { + const voice = asObject(message.voice); + if (Object.keys(voice).length > 0) { + return { + kind: "voice", + fileId: normalizeString(voice.file_id), + durationSec: normalizeNumber(voice.duration), + mimeType: normalizeString(voice.mime_type), + title: "Voice message", + }; + } + + const audio = asObject(message.audio); + if (Object.keys(audio).length > 0) { + const performer = normalizeString(audio.performer); + const title = normalizeString(audio.title) ?? normalizeString(audio.file_name); + const combinedTitle = performer && title ? `${performer} - ${title}` : title ?? performer; + return { + kind: "audio", + fileId: normalizeString(audio.file_id), + durationSec: normalizeNumber(audio.duration), + mimeType: normalizeString(audio.mime_type), + title: combinedTitle, + }; + } + + const videoNote = asObject(message.video_note); + if (Object.keys(videoNote).length > 0) { + return { + kind: "video_note", + fileId: normalizeString(videoNote.file_id), + durationSec: normalizeNumber(videoNote.duration), + mimeType: null, + title: "Video note", + }; + } + + return { + kind: null, + fileId: null, + durationSec: null, + mimeType: null, + title: null, + }; +} + function detectDirection(message: JsonObject, chat: JsonObject, from: JsonObject): "IN" | "OUT" { if (typeof message.outgoing === "boolean") return message.outgoing ? "OUT" : "IN"; if (typeof message.is_outgoing === "boolean") return message.is_outgoing ? "OUT" : "IN"; @@ -113,7 +176,19 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1 ? String(fallbackContactSource.id) : null; - const text = cropText(message.text) ?? cropText(message.caption); + const media = pickTelegramMedia(message); + const text = + cropText(message.text) ?? + cropText(message.caption) ?? + (media.kind === "voice" + ? "[voice message]" + : media.kind === "video_note" + ? "[video note]" + : media.kind === "audio" + ? media.title + ? `[audio] ${media.title}` + : "[audio]" + : null); const businessConnectionId = message.business_connection_id != null @@ -148,6 +223,11 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1 contactExternalId, text, businessConnectionId, + mediaKind: media.kind, + mediaFileId: media.fileId, + mediaDurationSec: media.durationSec, + mediaMimeType: media.mimeType, + mediaTitle: media.title, updateId: updateId != null ? String(updateId) : null, chatTitle: typeof chat.title === "string" ? chat.title : null, chatUsername: normalizeString(chat.username),