217 lines
6.6 KiB
TypeScript
217 lines
6.6 KiB
TypeScript
import { Queue, Worker, type JobsOptions, type ConnectionOptions } from "bullmq";
|
|
import { Prisma } from "@prisma/client";
|
|
import { prisma } from "../utils/prisma";
|
|
|
|
export const OUTBOUND_DELIVERY_QUEUE_NAME = (
|
|
process.env.SENDER_FLOW_QUEUE_NAME ||
|
|
process.env.OUTBOUND_DELIVERY_QUEUE_NAME ||
|
|
"sender.flow"
|
|
).trim();
|
|
|
|
export type OutboundDeliveryJob = {
|
|
omniMessageId: string;
|
|
endpoint: string;
|
|
method?: "POST" | "PUT" | "PATCH";
|
|
headers?: Record<string, string>;
|
|
payload: unknown;
|
|
channel?: string;
|
|
provider?: string;
|
|
};
|
|
|
|
function redisConnectionFromEnv(): ConnectionOptions {
|
|
const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim();
|
|
const parsed = new URL(raw);
|
|
return {
|
|
host: parsed.hostname,
|
|
port: parsed.port ? Number(parsed.port) : 6379,
|
|
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
|
|
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
|
|
db: parsed.pathname && parsed.pathname !== "/" ? Number(parsed.pathname.slice(1)) : undefined,
|
|
maxRetriesPerRequest: null,
|
|
};
|
|
}
|
|
|
|
function ensureHttpUrl(value: string) {
|
|
const raw = (value ?? "").trim();
|
|
if (!raw) throw new Error("endpoint is required");
|
|
const parsed = new URL(raw);
|
|
if (parsed.protocol !== "http:" && parsed.protocol !== "https:") {
|
|
throw new Error(`Unsupported endpoint protocol: ${parsed.protocol}`);
|
|
}
|
|
return parsed.toString();
|
|
}
|
|
|
|
function compactError(error: unknown) {
|
|
if (!error) return "unknown_error";
|
|
if (typeof error === "string") return error;
|
|
const anyErr = error as any;
|
|
return String(anyErr?.message ?? anyErr);
|
|
}
|
|
|
|
function extractProviderMessageId(body: unknown): string | null {
|
|
const obj = body as any;
|
|
if (!obj || typeof obj !== "object") return null;
|
|
const candidate =
|
|
obj?.message_id ??
|
|
obj?.messageId ??
|
|
obj?.id ??
|
|
obj?.result?.message_id ??
|
|
obj?.result?.id ??
|
|
null;
|
|
if (candidate == null) return null;
|
|
return String(candidate);
|
|
}
|
|
|
|
export function outboundDeliveryQueue() {
|
|
return new Queue<OutboundDeliveryJob, unknown, "deliver">(OUTBOUND_DELIVERY_QUEUE_NAME, {
|
|
connection: redisConnectionFromEnv(),
|
|
defaultJobOptions: {
|
|
removeOnComplete: { count: 1000 },
|
|
removeOnFail: { count: 5000 },
|
|
},
|
|
});
|
|
}
|
|
|
|
export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?: JobsOptions) {
|
|
const endpoint = ensureHttpUrl(input.endpoint);
|
|
const q = outboundDeliveryQueue();
|
|
|
|
const payload = (input.payload ?? null) as Prisma.InputJsonValue;
|
|
// Keep source message in pending before actual send starts.
|
|
await prisma.omniMessage.update({
|
|
where: { id: input.omniMessageId },
|
|
data: {
|
|
status: "PENDING",
|
|
rawJson: {
|
|
queue: {
|
|
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
|
|
enqueuedAt: new Date().toISOString(),
|
|
},
|
|
deliveryRequest: {
|
|
endpoint,
|
|
method: input.method ?? "POST",
|
|
channel: input.channel ?? null,
|
|
provider: input.provider ?? null,
|
|
payload,
|
|
},
|
|
},
|
|
},
|
|
});
|
|
|
|
return q.add("deliver", { ...input, endpoint }, {
|
|
jobId: `omni:${input.omniMessageId}`,
|
|
attempts: 12,
|
|
backoff: { type: "exponential", delay: 1000 },
|
|
...opts,
|
|
});
|
|
}
|
|
|
|
export function startOutboundDeliveryWorker() {
|
|
return new Worker<OutboundDeliveryJob, unknown, "deliver">(
|
|
OUTBOUND_DELIVERY_QUEUE_NAME,
|
|
async (job) => {
|
|
const msg = await prisma.omniMessage.findUnique({
|
|
where: { id: job.data.omniMessageId },
|
|
include: { thread: true },
|
|
});
|
|
if (!msg) return;
|
|
|
|
// Idempotency: if already sent/delivered, do not resend.
|
|
if ((msg.status === "SENT" || msg.status === "DELIVERED" || msg.status === "READ") && msg.providerMessageId) {
|
|
return;
|
|
}
|
|
|
|
const endpoint = ensureHttpUrl(job.data.endpoint);
|
|
const method = job.data.method ?? "POST";
|
|
const headers: Record<string, string> = {
|
|
"content-type": "application/json",
|
|
...(job.data.headers ?? {}),
|
|
};
|
|
|
|
const requestPayload = (job.data.payload ?? null) as Prisma.InputJsonValue;
|
|
const requestStartedAt = new Date().toISOString();
|
|
try {
|
|
const response = await fetch(endpoint, {
|
|
method,
|
|
headers,
|
|
body: JSON.stringify(requestPayload ?? {}),
|
|
});
|
|
|
|
const text = await response.text();
|
|
const responseBody = (() => {
|
|
try {
|
|
return JSON.parse(text);
|
|
} catch {
|
|
return text;
|
|
}
|
|
})();
|
|
|
|
if (!response.ok) {
|
|
throw new Error(`HTTP ${response.status}: ${typeof responseBody === "string" ? responseBody : JSON.stringify(responseBody)}`);
|
|
}
|
|
|
|
const providerMessageId = extractProviderMessageId(responseBody);
|
|
await prisma.omniMessage.update({
|
|
where: { id: msg.id },
|
|
data: {
|
|
status: "SENT",
|
|
providerMessageId,
|
|
rawJson: {
|
|
queue: {
|
|
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
|
|
completedAt: new Date().toISOString(),
|
|
attemptsMade: job.attemptsMade + 1,
|
|
},
|
|
deliveryRequest: {
|
|
endpoint,
|
|
method,
|
|
channel: job.data.channel ?? null,
|
|
provider: job.data.provider ?? null,
|
|
startedAt: requestStartedAt,
|
|
payload: requestPayload,
|
|
},
|
|
deliveryResponse: {
|
|
status: response.status,
|
|
body: responseBody,
|
|
},
|
|
},
|
|
},
|
|
});
|
|
} catch (error) {
|
|
const isLastAttempt =
|
|
typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts;
|
|
|
|
if (isLastAttempt) {
|
|
await prisma.omniMessage.update({
|
|
where: { id: msg.id },
|
|
data: {
|
|
status: "FAILED",
|
|
rawJson: {
|
|
queue: {
|
|
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
|
|
failedAt: new Date().toISOString(),
|
|
attemptsMade: job.attemptsMade + 1,
|
|
},
|
|
deliveryRequest: {
|
|
endpoint,
|
|
method,
|
|
channel: job.data.channel ?? null,
|
|
provider: job.data.provider ?? null,
|
|
startedAt: requestStartedAt,
|
|
payload: requestPayload,
|
|
},
|
|
deliveryError: {
|
|
message: compactError(error),
|
|
},
|
|
},
|
|
},
|
|
});
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
},
|
|
{ connection: redisConnectionFromEnv() },
|
|
);
|
|
}
|