From 052f37d0ecd187aa1338fa35b2ef53689253bf32 Mon Sep 17 00:00:00 2001 From: Ruslan Bakiev Date: Sat, 21 Feb 2026 16:27:04 +0700 Subject: [PATCH] feat: add telegram business connect onboarding and status sync --- .../telegram/business/connect/refresh.post.ts | 71 +++++++++ .../telegram/business/connect/start.post.ts | 51 ++++++ .../telegram/business/connect/status.get.ts | 60 +++++++ .../omni/telegram/business/webhook.post.ts | 130 +++++++++++++++ .../server/utils/telegramBusinessConnect.ts | 150 ++++++++++++++++++ omni_inbound/README.md | 1 + omni_inbound/src/server.ts | 40 ++++- 7 files changed, 501 insertions(+), 2 deletions(-) create mode 100644 frontend/server/api/omni/telegram/business/connect/refresh.post.ts create mode 100644 frontend/server/api/omni/telegram/business/connect/start.post.ts create mode 100644 frontend/server/api/omni/telegram/business/connect/status.get.ts create mode 100644 frontend/server/api/omni/telegram/business/webhook.post.ts create mode 100644 frontend/server/utils/telegramBusinessConnect.ts diff --git a/frontend/server/api/omni/telegram/business/connect/refresh.post.ts b/frontend/server/api/omni/telegram/business/connect/refresh.post.ts new file mode 100644 index 0000000..fb20f58 --- /dev/null +++ b/frontend/server/api/omni/telegram/business/connect/refresh.post.ts @@ -0,0 +1,71 @@ +import { readBody } from "h3"; +import { getAuthContext } from "../../../../../utils/auth"; +import { prisma } from "../../../../../utils/prisma"; +import { telegramBotApi } from "../../../../../utils/telegram"; + +type RefreshBody = { + businessConnectionId?: string; +}; + +function mapFlags(raw: any) { + const isEnabled = typeof raw?.is_enabled === "boolean" ? raw.is_enabled : null; + const canReply = typeof raw?.can_reply === "boolean" + ? raw.can_reply + : typeof raw?.rights?.can_reply === "boolean" + ? raw.rights.can_reply + : null; + return { isEnabled, canReply }; +} + +export default defineEventHandler(async (event) => { + const auth = await getAuthContext(event); + const body = await readBody(event); + + const businessConnectionId = String(body?.businessConnectionId ?? "").trim(); + if (!businessConnectionId) { + throw createError({ statusCode: 400, statusMessage: "businessConnectionId is required" }); + } + + const existing = await prisma.telegramBusinessConnection.findFirst({ + where: { + teamId: auth.teamId, + businessConnectionId, + }, + select: { id: true }, + }); + if (!existing) { + throw createError({ statusCode: 404, statusMessage: "business connection not found" }); + } + + const response = await telegramBotApi("getBusinessConnection", { business_connection_id: businessConnectionId }); + const { isEnabled, canReply } = mapFlags(response); + + const updated = await prisma.telegramBusinessConnection.update({ + where: { id: existing.id }, + data: { + isEnabled, + canReply, + rawJson: { + state: "connected", + refreshedAt: new Date().toISOString(), + businessConnection: response, + }, + }, + select: { + businessConnectionId: true, + isEnabled: true, + canReply: true, + updatedAt: true, + }, + }); + + return { + ok: true, + connection: { + businessConnectionId: updated.businessConnectionId, + isEnabled: updated.isEnabled, + canReply: updated.canReply, + updatedAt: updated.updatedAt.toISOString(), + }, + }; +}); diff --git a/frontend/server/api/omni/telegram/business/connect/start.post.ts b/frontend/server/api/omni/telegram/business/connect/start.post.ts new file mode 100644 index 0000000..8ee8913 --- /dev/null +++ b/frontend/server/api/omni/telegram/business/connect/start.post.ts @@ -0,0 +1,51 @@ +import { getAuthContext } from "../../../../../utils/auth"; +import { prisma } from "../../../../../utils/prisma"; +import { buildTelegramStartUrl, issueLinkToken } from "../../../../../utils/telegramBusinessConnect"; + +export default defineEventHandler(async (event) => { + const auth = await getAuthContext(event); + const { token, payload } = issueLinkToken({ teamId: auth.teamId, userId: auth.userId }); + + const pendingId = `pending:${payload.nonce}`; + await prisma.telegramBusinessConnection.upsert({ + where: { + teamId_businessConnectionId: { + teamId: auth.teamId, + businessConnectionId: pendingId, + }, + }, + create: { + teamId: auth.teamId, + businessConnectionId: pendingId, + rawJson: { + state: "pending_link", + link: { + nonce: payload.nonce, + exp: payload.exp, + createdAt: new Date().toISOString(), + createdByUserId: auth.userId, + }, + }, + }, + update: { + isEnabled: null, + canReply: null, + rawJson: { + state: "pending_link", + link: { + nonce: payload.nonce, + exp: payload.exp, + createdAt: new Date().toISOString(), + createdByUserId: auth.userId, + }, + }, + }, + }); + + return { + ok: true, + status: "pending_link", + connectUrl: buildTelegramStartUrl(token), + expiresAt: new Date(payload.exp * 1000).toISOString(), + }; +}); diff --git a/frontend/server/api/omni/telegram/business/connect/status.get.ts b/frontend/server/api/omni/telegram/business/connect/status.get.ts new file mode 100644 index 0000000..c1d3190 --- /dev/null +++ b/frontend/server/api/omni/telegram/business/connect/status.get.ts @@ -0,0 +1,60 @@ +import { getAuthContext } from "../../../../../utils/auth"; +import { prisma } from "../../../../../utils/prisma"; + +function normalizeStatus(input: { + pendingCount: number; + linkedPendingCount: number; + connectedCount: number; + enabledCount: number; + replyEnabledCount: number; +}) { + if (input.connectedCount > 0) { + if (input.replyEnabledCount > 0 && input.enabledCount > 0) return "connected"; + if (input.enabledCount === 0) return "disabled"; + return "no_reply_rights"; + } + if (input.linkedPendingCount > 0) return "pending_business_connection"; + if (input.pendingCount > 0) return "pending_link"; + return "not_connected"; +} + +export default defineEventHandler(async (event) => { + const auth = await getAuthContext(event); + const rows = await prisma.telegramBusinessConnection.findMany({ + where: { teamId: auth.teamId }, + orderBy: { updatedAt: "desc" }, + take: 50, + }); + + const pending = rows.filter((r) => r.businessConnectionId.startsWith("pending:")); + const active = rows.filter((r) => !r.businessConnectionId.startsWith("pending:")); + + const linkedPendingCount = pending.filter((r) => { + const raw = (r.rawJson ?? {}) as any; + return Boolean(raw?.link?.telegramUserId || raw?.link?.chatId); + }).length; + + const enabledCount = active.filter((r) => r.isEnabled !== false).length; + const replyEnabledCount = active.filter((r) => r.canReply === true).length; + + const status = normalizeStatus({ + pendingCount: pending.length, + linkedPendingCount, + connectedCount: active.length, + enabledCount, + replyEnabledCount, + }); + + return { + ok: true, + status, + pendingCount: pending.length, + connectedCount: active.length, + connections: active.map((r) => ({ + businessConnectionId: r.businessConnectionId, + isEnabled: r.isEnabled, + canReply: r.canReply, + updatedAt: r.updatedAt.toISOString(), + })), + }; +}); diff --git a/frontend/server/api/omni/telegram/business/webhook.post.ts b/frontend/server/api/omni/telegram/business/webhook.post.ts new file mode 100644 index 0000000..7e4c84c --- /dev/null +++ b/frontend/server/api/omni/telegram/business/webhook.post.ts @@ -0,0 +1,130 @@ +import { getHeader, readBody } from "h3"; +import { prisma } from "../../../../utils/prisma"; +import { + extractLinkTokenFromStartText, + getBusinessConnectionFromUpdate, + getTelegramChatIdFromUpdate, + verifyLinkToken, +} from "../../../../utils/telegramBusinessConnect"; + +function hasValidSecret(event: any) { + const expected = String(process.env.TELEGRAM_WEBHOOK_SECRET || "").trim(); + if (!expected) return true; + const incoming = String(getHeader(event, "x-telegram-bot-api-secret-token") || "").trim(); + return incoming !== "" && incoming === expected; +} + +function pickStartText(update: any): string | null { + const text = + update?.message?.text ?? + update?.business_message?.text ?? + update?.edited_business_message?.text ?? + null; + if (typeof text !== "string") return null; + return text; +} + +export default defineEventHandler(async (event) => { + if (!hasValidSecret(event)) { + throw createError({ statusCode: 401, statusMessage: "invalid webhook secret" }); + } + + const update = await readBody(event); + const nowIso = new Date().toISOString(); + + const startText = pickStartText(update); + const linkToken = startText ? extractLinkTokenFromStartText(startText) : null; + if (linkToken) { + const payload = verifyLinkToken(linkToken); + if (!payload) return { ok: true, accepted: false, reason: "invalid_or_expired_link_token" }; + + const pendingId = `pending:${payload.nonce}`; + const chatId = getTelegramChatIdFromUpdate(update); + + await prisma.telegramBusinessConnection.updateMany({ + where: { + teamId: payload.teamId, + businessConnectionId: pendingId, + }, + data: { + rawJson: { + state: "pending_business_connection", + link: { + nonce: payload.nonce, + exp: payload.exp, + linkedAt: nowIso, + telegramUserId: chatId, + chatId, + }, + lastStartUpdate: update, + }, + }, + }); + + return { ok: true, accepted: true, type: "start_link" }; + } + + const businessConnection = getBusinessConnectionFromUpdate(update); + if (businessConnection) { + const pendingRows = await prisma.telegramBusinessConnection.findMany({ + where: { + businessConnectionId: { + startsWith: "pending:", + }, + }, + orderBy: { updatedAt: "desc" }, + take: 200, + }); + + const matchedPending = pendingRows.find((row) => { + const raw = (row.rawJson ?? {}) as any; + const linkedTelegramUserId = raw?.link?.telegramUserId != null ? String(raw.link.telegramUserId) : null; + if (!businessConnection.userChatId) return false; + return linkedTelegramUserId === businessConnection.userChatId; + }); + + if (!matchedPending) { + return { ok: true, accepted: false, reason: "team_not_linked_for_business_connection" }; + } + + await prisma.$transaction([ + prisma.telegramBusinessConnection.upsert({ + where: { + teamId_businessConnectionId: { + teamId: matchedPending.teamId, + businessConnectionId: businessConnection.id, + }, + }, + create: { + teamId: matchedPending.teamId, + businessConnectionId: businessConnection.id, + isEnabled: businessConnection.isEnabled, + canReply: businessConnection.canReply, + rawJson: { + state: "connected", + connectedAt: nowIso, + userChatId: businessConnection.userChatId, + businessConnection: businessConnection.raw, + update, + }, + }, + update: { + isEnabled: businessConnection.isEnabled, + canReply: businessConnection.canReply, + rawJson: { + state: "connected", + connectedAt: nowIso, + userChatId: businessConnection.userChatId, + businessConnection: businessConnection.raw, + update, + }, + }, + }), + prisma.telegramBusinessConnection.delete({ where: { id: matchedPending.id } }), + ]); + + return { ok: true, accepted: true, type: "business_connection" }; + } + + return { ok: true, accepted: true, type: "ignored" }; +}); diff --git a/frontend/server/utils/telegramBusinessConnect.ts b/frontend/server/utils/telegramBusinessConnect.ts new file mode 100644 index 0000000..535be47 --- /dev/null +++ b/frontend/server/utils/telegramBusinessConnect.ts @@ -0,0 +1,150 @@ +import { createHmac, randomBytes, timingSafeEqual } from "node:crypto"; + +export type LinkTokenPayloadV1 = { + v: 1; + teamId: string; + userId: string; + nonce: string; + exp: number; +}; + +const TOKEN_TTL_SEC = Number(process.env.TELEGRAM_LINK_TOKEN_TTL_SEC || 10 * 60); + +function base64UrlEncode(input: Buffer | string) { + return Buffer.from(input) + .toString("base64") + .replace(/\+/g, "-") + .replace(/\//g, "_") + .replace(/=+$/g, ""); +} + +function base64UrlDecode(input: string) { + const normalized = input.replace(/-/g, "+").replace(/_/g, "/"); + const pad = normalized.length % 4; + const padded = pad === 0 ? normalized : normalized + "=".repeat(4 - pad); + return Buffer.from(padded, "base64"); +} + +export function requireLinkSecret() { + const secret = String(process.env.TELEGRAM_LINK_SECRET || process.env.TELEGRAM_BOT_TOKEN || "").trim(); + if (!secret) { + throw createError({ statusCode: 500, statusMessage: "TELEGRAM_LINK_SECRET or TELEGRAM_BOT_TOKEN is required" }); + } + return secret; +} + +export function requireBotUsername() { + const botUsername = String(process.env.TELEGRAM_BOT_USERNAME || "").trim().replace(/^@/, ""); + if (!botUsername) { + throw createError({ statusCode: 500, statusMessage: "TELEGRAM_BOT_USERNAME is required" }); + } + return botUsername; +} + +function sign(input: string, secret: string) { + return createHmac("sha256", secret).update(input).digest(); +} + +export function issueLinkToken(input: { teamId: string; userId: string }) { + const secret = requireLinkSecret(); + const payload: LinkTokenPayloadV1 = { + v: 1, + teamId: input.teamId, + userId: input.userId, + nonce: randomBytes(12).toString("hex"), + exp: Math.floor(Date.now() / 1000) + Math.max(60, TOKEN_TTL_SEC), + }; + const payloadRaw = JSON.stringify(payload); + const payloadEncoded = base64UrlEncode(payloadRaw); + const sig = base64UrlEncode(sign(payloadEncoded, secret)); + const token = `${payloadEncoded}.${sig}`; + return { token, payload }; +} + +export function verifyLinkToken(token: string): LinkTokenPayloadV1 | null { + const raw = String(token || "").trim(); + if (!raw) return null; + + const dotIdx = raw.indexOf("."); + if (dotIdx <= 0 || dotIdx >= raw.length - 1) return null; + + const payloadEncoded = raw.slice(0, dotIdx); + const sigEncoded = raw.slice(dotIdx + 1); + + try { + const secret = requireLinkSecret(); + const expected = sign(payloadEncoded, secret); + const actual = base64UrlDecode(sigEncoded); + if (actual.length !== expected.length || !timingSafeEqual(actual, expected)) { + return null; + } + + const payload = JSON.parse(base64UrlDecode(payloadEncoded).toString("utf8")) as LinkTokenPayloadV1; + if (!payload || payload.v !== 1) return null; + if (!payload.teamId || !payload.userId || !payload.nonce || !payload.exp) return null; + if (Math.floor(Date.now() / 1000) > payload.exp) return null; + return payload; + } catch { + return null; + } +} + +export function extractLinkTokenFromStartText(text: string) { + const trimmed = String(text || "").trim(); + if (!trimmed.startsWith("/start")) return null; + const parts = trimmed.split(/\s+/).filter(Boolean); + if (parts.length < 2) return null; + const arg = parts[1] || ""; + if (!arg.startsWith("link_")) return null; + return arg.slice("link_".length); +} + +export function buildTelegramStartUrl(token: string) { + const botUsername = requireBotUsername(); + return `https://t.me/${botUsername}?start=link_${token}`; +} + +export function getTelegramChatIdFromUpdate(update: any): string | null { + const candidates = [ + update?.message?.chat?.id, + update?.business_message?.chat?.id, + update?.edited_business_message?.chat?.id, + update?.business_connection?.user_chat_id, + ]; + for (const c of candidates) { + if (c == null) continue; + const v = String(c).trim(); + if (v) return v; + } + return null; +} + +export function getBusinessConnectionFromUpdate(update: any): { + id: string; + userChatId: string | null; + isEnabled: boolean | null; + canReply: boolean | null; + raw: any; +} | null { + const bc = (update?.business_connection ?? null) as any; + if (!bc || typeof bc !== "object") return null; + + const id = String(bc.id ?? "").trim(); + if (!id) return null; + + const userChatId = bc.user_chat_id != null ? String(bc.user_chat_id) : null; + const isEnabled = typeof bc.is_enabled === "boolean" ? bc.is_enabled : null; + const canReply = typeof bc.can_reply === "boolean" + ? bc.can_reply + : typeof bc.rights?.can_reply === "boolean" + ? bc.rights.can_reply + : null; + + return { + id, + userChatId, + isEnabled, + canReply, + raw: bc, + }; +} diff --git a/omni_inbound/README.md b/omni_inbound/README.md index 4240043..f8482aa 100644 --- a/omni_inbound/README.md +++ b/omni_inbound/README.md @@ -31,6 +31,7 @@ - `RECEIVER_FLOW_QUEUE_NAME` (default: `receiver.flow`) - `INBOUND_QUEUE_NAME` (legacy alias, optional) - `TELEGRAM_WEBHOOK_SECRET` (optional, но обязателен для production) +- `TELEGRAM_CONNECT_WEBHOOK_FORWARD_URL` (optional; URL CRM endpoint для линковки Telegram Business) - `MAX_BODY_SIZE_BYTES` (default: `1048576`) ## Запуск diff --git a/omni_inbound/src/server.ts b/omni_inbound/src/server.ts index 8f0005f..55b3b8f 100644 --- a/omni_inbound/src/server.ts +++ b/omni_inbound/src/server.ts @@ -37,6 +37,35 @@ function validateTelegramSecret(req: IncomingMessage): boolean { return incoming !== "" && incoming === expected; } +async function forwardTelegramConnectWebhook(rawBody: unknown) { + const url = (process.env.TELEGRAM_CONNECT_WEBHOOK_FORWARD_URL || "").trim(); + if (!url) return; + + const headers: Record = { + "content-type": "application/json", + }; + const secret = (process.env.TELEGRAM_WEBHOOK_SECRET || "").trim(); + if (secret) { + headers["x-telegram-bot-api-secret-token"] = secret; + } + + try { + const res = await fetch(url, { + method: "POST", + headers, + body: JSON.stringify(rawBody ?? {}), + }); + + if (!res.ok) { + const text = await res.text().catch(() => ""); + console.warn(`[omni_inbound] telegram connect forward failed: ${res.status} ${text.slice(0, 300)}`); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`[omni_inbound] telegram connect forward error: ${message}`); + } +} + export function startServer() { const port = Number(process.env.PORT || 8080); @@ -62,12 +91,17 @@ export function startServer() { return; } + let body: unknown = {}; + let envelope: ReturnType | null = null; + try { - const body = await readJsonBody(req); - const envelope = parseTelegramBusinessUpdate(body); + body = await readJsonBody(req); + envelope = parseTelegramBusinessUpdate(body); await enqueueInboundEvent(envelope); + void forwardTelegramConnectWebhook(body); + writeJson(res, 200, { ok: true, queued: true, @@ -77,6 +111,8 @@ export function startServer() { }); } catch (error) { if (isDuplicateJobError(error)) { + void forwardTelegramConnectWebhook(body); + writeJson(res, 200, { ok: true, queued: false,