import { Queue, Worker, type JobsOptions, type ConnectionOptions } from "bullmq"; import { Prisma } from "../generated/prisma/client"; import { prisma } from "../utils/prisma"; export const OUTBOUND_DELIVERY_QUEUE_NAME = ( process.env.SENDER_FLOW_QUEUE_NAME || process.env.OUTBOUND_DELIVERY_QUEUE_NAME || "sender.flow" ).trim(); export type OutboundDeliveryJob = { omniMessageId: string; endpoint: string; method?: "POST" | "PUT" | "PATCH"; headers?: Record; payload: unknown; channel?: string; provider?: string; }; function redisConnectionFromEnv(): ConnectionOptions { const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim(); const parsed = new URL(raw); return { host: parsed.hostname, port: parsed.port ? Number(parsed.port) : 6379, username: parsed.username ? decodeURIComponent(parsed.username) : undefined, password: parsed.password ? decodeURIComponent(parsed.password) : undefined, db: parsed.pathname && parsed.pathname !== "/" ? Number(parsed.pathname.slice(1)) : undefined, maxRetriesPerRequest: null, }; } function ensureHttpUrl(value: string) { const raw = (value ?? "").trim(); if (!raw) throw new Error("endpoint is required"); const parsed = new URL(raw); if (parsed.protocol !== "http:" && parsed.protocol !== "https:") { throw new Error(`Unsupported endpoint protocol: ${parsed.protocol}`); } return parsed.toString(); } function compactError(error: unknown) { if (!error) return "unknown_error"; if (typeof error === "string") return error; const anyErr = error as any; return String(anyErr?.message ?? anyErr); } function extractProviderMessageId(body: unknown): string | null { const obj = body as any; if (!obj || typeof obj !== "object") return null; const candidate = obj?.message_id ?? obj?.messageId ?? obj?.id ?? obj?.result?.message_id ?? obj?.result?.id ?? null; if (candidate == null) return null; return String(candidate); } function asObject(value: unknown): Record { if (!value || typeof value !== "object" || Array.isArray(value)) return {}; return value as Record; } export function outboundDeliveryQueue() { return new Queue(OUTBOUND_DELIVERY_QUEUE_NAME, { connection: redisConnectionFromEnv(), defaultJobOptions: { removeOnComplete: { count: 1000 }, removeOnFail: { count: 5000 }, }, }); } export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?: JobsOptions) { const endpoint = ensureHttpUrl(input.endpoint); const q = outboundDeliveryQueue(); const payload = (input.payload ?? null) as Prisma.InputJsonValue; const existing = await prisma.omniMessage.findUnique({ where: { id: input.omniMessageId }, select: { rawJson: true }, }); const raw = asObject(existing?.rawJson); const rawQueue = asObject(raw.queue); const rawDeliveryRequest = asObject(raw.deliveryRequest); // Keep source message in pending before actual send starts. await prisma.omniMessage.update({ where: { id: input.omniMessageId }, data: { status: "PENDING", rawJson: { ...raw, queue: { ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, enqueuedAt: new Date().toISOString(), }, deliveryRequest: { ...rawDeliveryRequest, endpoint, method: input.method ?? "POST", channel: input.channel ?? null, provider: input.provider ?? null, payload, }, }, }, }); return q.add("deliver", { ...input, endpoint }, { jobId: `omni-${input.omniMessageId}`, attempts: 12, backoff: { type: "exponential", delay: 1000 }, ...opts, }); } export function startOutboundDeliveryWorker() { return new Worker( OUTBOUND_DELIVERY_QUEUE_NAME, async (job) => { const msg = await prisma.omniMessage.findUnique({ where: { id: job.data.omniMessageId }, include: { thread: true }, }); if (!msg) return; // Idempotency: if already sent/delivered, do not resend. if ((msg.status === "SENT" || msg.status === "DELIVERED" || msg.status === "READ") && msg.providerMessageId) { return; } const endpoint = ensureHttpUrl(job.data.endpoint); const method = job.data.method ?? "POST"; const headers: Record = { "content-type": "application/json", ...(job.data.headers ?? {}), }; const requestPayload = (job.data.payload ?? null) as Prisma.InputJsonValue; const requestStartedAt = new Date().toISOString(); try { const response = await fetch(endpoint, { method, headers, body: JSON.stringify(requestPayload ?? {}), }); const text = await response.text(); const responseBody = (() => { try { return JSON.parse(text); } catch { return text; } })(); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${typeof responseBody === "string" ? responseBody : JSON.stringify(responseBody)}`); } const providerMessageId = extractProviderMessageId(responseBody); const raw = asObject(msg.rawJson); const rawQueue = asObject(raw.queue); const rawDeliveryRequest = asObject(raw.deliveryRequest); await prisma.omniMessage.update({ where: { id: msg.id }, data: { status: "SENT", providerMessageId, rawJson: { ...raw, queue: { ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, completedAt: new Date().toISOString(), attemptsMade: job.attemptsMade + 1, }, deliveryRequest: { ...rawDeliveryRequest, endpoint, method, channel: job.data.channel ?? null, provider: job.data.provider ?? null, startedAt: requestStartedAt, payload: requestPayload, }, deliveryResponse: { status: response.status, body: responseBody, }, }, }, }); } catch (error) { const isLastAttempt = typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts; if (isLastAttempt) { const raw = asObject(msg.rawJson); const rawQueue = asObject(raw.queue); const rawDeliveryRequest = asObject(raw.deliveryRequest); await prisma.omniMessage.update({ where: { id: msg.id }, data: { status: "FAILED", rawJson: { ...raw, queue: { ...rawQueue, queueName: OUTBOUND_DELIVERY_QUEUE_NAME, failedAt: new Date().toISOString(), attemptsMade: job.attemptsMade + 1, }, deliveryRequest: { ...rawDeliveryRequest, endpoint, method, channel: job.data.channel ?? null, provider: job.data.provider ?? null, startedAt: requestStartedAt, payload: requestPayload, }, deliveryError: { message: compactError(error), }, }, }, }); } throw error; } }, { connection: redisConnectionFromEnv() }, ); }