feat(telegram): ingest and render inbound voice messages

This commit is contained in:
Ruslan Bakiev
2026-02-23 12:21:53 +07:00
parent c94c229a1a
commit acd974766a
4 changed files with 225 additions and 5 deletions

View File

@@ -0,0 +1,76 @@
import { getQuery, setHeader } from "h3";
import { getAuthContext } from "../../../utils/auth";
import { prisma } from "../../../utils/prisma";
import { requireTelegramBotToken, telegramApiBase, telegramBotApi } from "../../../utils/telegram";
const TELEGRAM_AUDIO_FILE_MARKER = "tg-file:";
type TelegramFileMeta = {
file_id: string;
file_path?: string;
};
function parseTelegramFileId(audioUrl: string | null | undefined) {
const raw = String(audioUrl ?? "").trim();
if (!raw.startsWith(TELEGRAM_AUDIO_FILE_MARKER)) return null;
const fileId = raw.slice(TELEGRAM_AUDIO_FILE_MARKER.length).trim();
return fileId || null;
}
export default defineEventHandler(async (event) => {
const auth = await getAuthContext(event);
const query = getQuery(event);
const messageId = String(query.messageId ?? "").trim();
if (!messageId) {
throw createError({ statusCode: 400, statusMessage: "messageId is required" });
}
const message = await prisma.contactMessage.findFirst({
where: {
id: messageId,
channel: "TELEGRAM",
contact: { teamId: auth.teamId },
},
select: {
audioUrl: true,
},
});
if (!message) {
throw createError({ statusCode: 404, statusMessage: "telegram message not found" });
}
const fileId = parseTelegramFileId(message.audioUrl);
if (!fileId) {
throw createError({ statusCode: 404, statusMessage: "telegram audio is missing" });
}
const meta = await telegramBotApi<TelegramFileMeta>("getFile", { file_id: fileId });
const filePath = String(meta?.file_path ?? "").trim();
if (!filePath) {
throw createError({ statusCode: 502, statusMessage: "telegram file path is unavailable" });
}
const apiBase = telegramApiBase().replace(/\/+$/, "");
const token = requireTelegramBotToken();
const remote = await fetch(`${apiBase}/file/bot${token}/${filePath}`);
if (!remote.ok) {
throw createError({ statusCode: 502, statusMessage: `telegram file fetch failed (${remote.status})` });
}
const contentType = remote.headers.get("content-type") || "audio/ogg";
const contentLength = remote.headers.get("content-length");
const body = Buffer.from(await remote.arrayBuffer());
setHeader(event, "content-type", contentType);
setHeader(event, "cache-control", "private, max-age=60");
if (contentLength) {
const parsedLength = Number(contentLength);
if (Number.isFinite(parsedLength) && parsedLength >= 0) {
setHeader(event, "content-length", parsedLength);
}
}
return body;
});

View File

@@ -67,6 +67,7 @@ function extractOmniNormalizedText(rawJson: unknown, fallbackText = "") {
type ClientTimelineContentType = "CALENDAR_EVENT" | "DOCUMENT" | "RECOMMENDATION";
const CONTACT_DOCUMENT_SCOPE_PREFIX = "contact:";
const TELEGRAM_AUDIO_FILE_MARKER = "tg-file:";
function mapTimelineContentType(value: ClientTimelineContentType) {
if (value === "CALENDAR_EVENT") return "calendar_event";
@@ -141,6 +142,19 @@ function visibleMessageWhere(hiddenInboxIds: string[]) {
};
}
function resolveContactMessageAudioUrl(message: {
id: string;
channel: string;
audioUrl: string | null;
}) {
const raw = String(message.audioUrl ?? "").trim();
if (!raw) return "";
if (message.channel === "TELEGRAM" && raw.startsWith(TELEGRAM_AUDIO_FILE_MARKER)) {
return `/api/omni/telegram/media?messageId=${encodeURIComponent(message.id)}`;
}
return raw;
}
async function upsertContactInbox(input: {
teamId: string;
contactId: string;
@@ -662,7 +676,7 @@ async function getDashboard(auth: AuthContext | null) {
kind: m.kind === "CALL" ? "call" : "message",
direction: m.direction === "IN" ? "in" : "out",
text: m.content,
audioUrl: "",
audioUrl: resolveContactMessageAudioUrl(m),
duration: m.durationSec ? new Date(m.durationSec * 1000).toISOString().slice(14, 19) : "",
transcript: Array.isArray(m.transcriptJson) ? ((m.transcriptJson as any) as string[]) : [],
deliveryStatus: resolveDeliveryStatus(m),
@@ -918,7 +932,7 @@ async function getClientTimeline(auth: AuthContext | null, contactIdInput: strin
kind: m.kind === "CALL" ? "call" : "message",
direction: m.direction === "IN" ? "in" : "out",
text: m.content,
audioUrl: "",
audioUrl: resolveContactMessageAudioUrl(m),
duration: m.durationSec ? new Date(m.durationSec * 1000).toISOString().slice(14, 19) : "",
transcript: Array.isArray(m.transcriptJson) ? ((m.transcriptJson as any) as string[]) : [],
deliveryStatus: resolveDeliveryStatus(m),

View File

@@ -28,6 +28,7 @@ type OmniInboundEnvelopeV1 = {
export const RECEIVER_FLOW_QUEUE_NAME = (process.env.RECEIVER_FLOW_QUEUE_NAME || "receiver.flow").trim();
const TELEGRAM_PLACEHOLDER_PREFIX = "Telegram ";
const TELEGRAM_AUDIO_FILE_MARKER = "tg-file:";
function redisConnectionFromEnv(): ConnectionOptions {
const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim();
@@ -47,6 +48,45 @@ function normalizeText(input: unknown) {
return t || "[no text]";
}
type TelegramInboundMedia = {
kind: "voice" | "audio" | "video_note" | null;
fileId: string | null;
durationSec: number | null;
label: string | null;
};
function parseTelegramInboundMedia(normalized: OmniInboundEnvelopeV1["payloadNormalized"]): TelegramInboundMedia {
const kindRaw = String(normalized.mediaKind ?? "").trim().toLowerCase();
const kind: TelegramInboundMedia["kind"] =
kindRaw === "voice" || kindRaw === "audio" || kindRaw === "video_note"
? kindRaw
: null;
const fileId = asString(normalized.mediaFileId);
const durationRaw = normalized.mediaDurationSec;
const durationParsed =
typeof durationRaw === "number"
? durationRaw
: typeof durationRaw === "string"
? Number(durationRaw)
: Number.NaN;
const durationSec =
Number.isFinite(durationParsed) && durationParsed > 0
? Math.max(1, Math.round(durationParsed))
: null;
const label = asString(normalized.mediaTitle);
return { kind, fileId, durationSec, label };
}
function fallbackTextFromMedia(media: TelegramInboundMedia) {
if (!media.kind) return null;
if (media.kind === "voice") return "[voice message]";
if (media.kind === "video_note") return "[video note]";
if (media.label) return `[audio] ${media.label}`;
return "[audio]";
}
function parseOccurredAt(input: string | null | undefined) {
const d = new Date(String(input ?? ""));
if (Number.isNaN(d.getTime())) return new Date();
@@ -338,7 +378,11 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
}
const businessConnectionId = String(n.businessConnectionId ?? "").trim() || null;
const text = normalizeText(n.text);
const media = parseTelegramInboundMedia(n);
const text = normalizeText(asString(n.text) ?? fallbackTextFromMedia(media));
const isAudioLike = Boolean(media.fileId) && (media.kind === "voice" || media.kind === "audio" || media.kind === "video_note");
const contactMessageKind: "MESSAGE" | "CALL" = isAudioLike ? "CALL" : "MESSAGE";
const contactMessageAudioUrl = isAudioLike ? `${TELEGRAM_AUDIO_FILE_MARKER}${media.fileId}` : null;
const occurredAt = parseOccurredAt(env.occurredAt);
const direction = safeDirection(env.direction);
const contactProfile = buildContactProfile(n, externalContactId);
@@ -376,6 +420,10 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
threadExternalId: externalChatId,
contactExternalId: externalContactId,
businessConnectionId,
mediaKind: media.kind,
mediaFileId: media.fileId,
mediaDurationSec: media.durationSec,
mediaLabel: media.label,
},
payloadNormalized: n,
payloadRaw: (env.payloadRaw ?? null) as Prisma.InputJsonValue,
@@ -431,10 +479,12 @@ async function ingestInbound(env: OmniInboundEnvelopeV1) {
data: {
contactId,
contactInboxId: inbox.id,
kind: "MESSAGE",
kind: contactMessageKind,
direction,
channel: "TELEGRAM",
content: text,
audioUrl: contactMessageAudioUrl,
durationSec: media.durationSec,
occurredAt,
},
});

View File

@@ -51,6 +51,69 @@ function normalizeString(value: unknown) {
return normalized || null;
}
function normalizeNumber(value: unknown) {
if (typeof value === "number" && Number.isFinite(value)) return value;
if (typeof value === "string") {
const parsed = Number(value);
if (Number.isFinite(parsed)) return parsed;
}
return null;
}
type TelegramMediaInfo = {
kind: "voice" | "audio" | "video_note" | null;
fileId: string | null;
durationSec: number | null;
mimeType: string | null;
title: string | null;
};
function pickTelegramMedia(message: JsonObject): TelegramMediaInfo {
const voice = asObject(message.voice);
if (Object.keys(voice).length > 0) {
return {
kind: "voice",
fileId: normalizeString(voice.file_id),
durationSec: normalizeNumber(voice.duration),
mimeType: normalizeString(voice.mime_type),
title: "Voice message",
};
}
const audio = asObject(message.audio);
if (Object.keys(audio).length > 0) {
const performer = normalizeString(audio.performer);
const title = normalizeString(audio.title) ?? normalizeString(audio.file_name);
const combinedTitle = performer && title ? `${performer} - ${title}` : title ?? performer;
return {
kind: "audio",
fileId: normalizeString(audio.file_id),
durationSec: normalizeNumber(audio.duration),
mimeType: normalizeString(audio.mime_type),
title: combinedTitle,
};
}
const videoNote = asObject(message.video_note);
if (Object.keys(videoNote).length > 0) {
return {
kind: "video_note",
fileId: normalizeString(videoNote.file_id),
durationSec: normalizeNumber(videoNote.duration),
mimeType: null,
title: "Video note",
};
}
return {
kind: null,
fileId: null,
durationSec: null,
mimeType: null,
title: null,
};
}
function detectDirection(message: JsonObject, chat: JsonObject, from: JsonObject): "IN" | "OUT" {
if (typeof message.outgoing === "boolean") return message.outgoing ? "OUT" : "IN";
if (typeof message.is_outgoing === "boolean") return message.is_outgoing ? "OUT" : "IN";
@@ -113,7 +176,19 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1
? String(fallbackContactSource.id)
: null;
const text = cropText(message.text) ?? cropText(message.caption);
const media = pickTelegramMedia(message);
const text =
cropText(message.text) ??
cropText(message.caption) ??
(media.kind === "voice"
? "[voice message]"
: media.kind === "video_note"
? "[video note]"
: media.kind === "audio"
? media.title
? `[audio] ${media.title}`
: "[audio]"
: null);
const businessConnectionId =
message.business_connection_id != null
@@ -148,6 +223,11 @@ export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1
contactExternalId,
text,
businessConnectionId,
mediaKind: media.kind,
mediaFileId: media.fileId,
mediaDurationSec: media.durationSec,
mediaMimeType: media.mimeType,
mediaTitle: media.title,
updateId: updateId != null ? String(updateId) : null,
chatTitle: typeof chat.title === "string" ? chat.title : null,
chatUsername: normalizeString(chat.username),