93 lines
2.6 KiB
TypeScript
93 lines
2.6 KiB
TypeScript
import { Queue, Worker, JobsOptions } from "bullmq";
|
|
import { getRedis } from "../utils/redis";
|
|
import { prisma } from "../utils/prisma";
|
|
import { telegramBotApi } from "../utils/telegram";
|
|
|
|
export const TELEGRAM_SEND_QUEUE_NAME = "telegram:send";
|
|
|
|
type TelegramSendJob = {
|
|
omniMessageId: string;
|
|
};
|
|
|
|
export function telegramSendQueue() {
|
|
return new Queue<TelegramSendJob>(TELEGRAM_SEND_QUEUE_NAME, {
|
|
connection: getRedis(),
|
|
defaultJobOptions: {
|
|
removeOnComplete: { count: 1000 },
|
|
removeOnFail: { count: 5000 },
|
|
},
|
|
});
|
|
}
|
|
|
|
export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOptions) {
|
|
const q = telegramSendQueue();
|
|
return q.add("send", input, {
|
|
jobId: input.omniMessageId, // idempotency
|
|
attempts: 10,
|
|
backoff: { type: "exponential", delay: 1000 },
|
|
...opts,
|
|
});
|
|
}
|
|
|
|
export function startTelegramSendWorker() {
|
|
return new Worker<TelegramSendJob>(
|
|
TELEGRAM_SEND_QUEUE_NAME,
|
|
async (job) => {
|
|
const msg = await prisma.omniMessage.findUnique({
|
|
where: { id: job.data.omniMessageId },
|
|
include: { thread: true },
|
|
});
|
|
if (!msg) return;
|
|
|
|
// Idempotency: if we already sent it, don't send twice.
|
|
if (msg.status === "SENT" && msg.providerMessageId) return;
|
|
|
|
if (msg.channel !== "TELEGRAM" || msg.direction !== "OUT") {
|
|
throw new Error(`Invalid omni message for telegram send: ${msg.id}`);
|
|
}
|
|
|
|
const thread = msg.thread;
|
|
const chatId = thread.externalChatId;
|
|
const businessConnectionId = thread.businessConnectionId || undefined;
|
|
|
|
try {
|
|
const result = await telegramBotApi<any>("sendMessage", {
|
|
chat_id: chatId,
|
|
text: msg.text,
|
|
...(businessConnectionId ? { business_connection_id: businessConnectionId } : {}),
|
|
});
|
|
|
|
const providerMessageId = result?.message_id != null ? String(result.message_id) : null;
|
|
await prisma.omniMessage.update({
|
|
where: { id: msg.id },
|
|
data: {
|
|
status: "SENT",
|
|
providerMessageId: providerMessageId,
|
|
rawJson: result,
|
|
},
|
|
});
|
|
} catch (e: any) {
|
|
const isLastAttempt =
|
|
typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts;
|
|
|
|
if (isLastAttempt) {
|
|
await prisma.omniMessage.update({
|
|
where: { id: msg.id },
|
|
data: {
|
|
status: "FAILED",
|
|
rawJson: {
|
|
error: String(e?.message || e),
|
|
attemptsMade: job.attemptsMade + 1,
|
|
},
|
|
},
|
|
});
|
|
}
|
|
|
|
throw e;
|
|
}
|
|
},
|
|
{ connection: getRedis() },
|
|
);
|
|
}
|
|
|