import { Queue, Worker, JobsOptions } from "bullmq"; import { getRedis } from "../utils/redis"; import { prisma } from "../utils/prisma"; import { telegramBotApi } from "../utils/telegram"; export const TELEGRAM_SEND_QUEUE_NAME = "telegram:send"; type TelegramSendJob = { omniMessageId: string; }; export function telegramSendQueue() { return new Queue(TELEGRAM_SEND_QUEUE_NAME, { connection: getRedis(), defaultJobOptions: { removeOnComplete: { count: 1000 }, removeOnFail: { count: 5000 }, }, }); } export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOptions) { const q = telegramSendQueue(); return q.add("send", input, { jobId: input.omniMessageId, // idempotency attempts: 10, backoff: { type: "exponential", delay: 1000 }, ...opts, }); } export function startTelegramSendWorker() { return new Worker( TELEGRAM_SEND_QUEUE_NAME, async (job) => { const msg = await prisma.omniMessage.findUnique({ where: { id: job.data.omniMessageId }, include: { thread: true }, }); if (!msg) return; // Idempotency: if we already sent it, don't send twice. if (msg.status === "SENT" && msg.providerMessageId) return; if (msg.channel !== "TELEGRAM" || msg.direction !== "OUT") { throw new Error(`Invalid omni message for telegram send: ${msg.id}`); } const thread = msg.thread; const chatId = thread.externalChatId; const businessConnectionId = thread.businessConnectionId || undefined; try { const result = await telegramBotApi("sendMessage", { chat_id: chatId, text: msg.text, ...(businessConnectionId ? { business_connection_id: businessConnectionId } : {}), }); const providerMessageId = result?.message_id != null ? String(result.message_id) : null; await prisma.omniMessage.update({ where: { id: msg.id }, data: { status: "SENT", providerMessageId: providerMessageId, rawJson: result, }, }); } catch (e: any) { const isLastAttempt = typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts; if (isLastAttempt) { await prisma.omniMessage.update({ where: { id: msg.id }, data: { status: "FAILED", rawJson: { error: String(e?.message || e), attemptsMade: job.attemptsMade + 1, }, }, }); } throw e; } }, { connection: getRedis() }, ); }