Refine CRM chat UX and add DB-backed pin toggle
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import { Queue, Worker, type JobsOptions } from "bullmq";
|
||||
import { Queue, Worker, type JobsOptions, type ConnectionOptions } from "bullmq";
|
||||
import { Prisma } from "@prisma/client";
|
||||
import { prisma } from "../utils/prisma";
|
||||
import { getRedis } from "../utils/redis";
|
||||
|
||||
export const OUTBOUND_DELIVERY_QUEUE_NAME = "omni-outbound";
|
||||
|
||||
@@ -15,6 +15,19 @@ export type OutboundDeliveryJob = {
|
||||
provider?: string;
|
||||
};
|
||||
|
||||
function redisConnectionFromEnv(): ConnectionOptions {
|
||||
const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim();
|
||||
const parsed = new URL(raw);
|
||||
return {
|
||||
host: parsed.hostname,
|
||||
port: parsed.port ? Number(parsed.port) : 6379,
|
||||
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
|
||||
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
|
||||
db: parsed.pathname && parsed.pathname !== "/" ? Number(parsed.pathname.slice(1)) : undefined,
|
||||
maxRetriesPerRequest: null,
|
||||
};
|
||||
}
|
||||
|
||||
function ensureHttpUrl(value: string) {
|
||||
const raw = (value ?? "").trim();
|
||||
if (!raw) throw new Error("endpoint is required");
|
||||
@@ -47,8 +60,8 @@ function extractProviderMessageId(body: unknown): string | null {
|
||||
}
|
||||
|
||||
export function outboundDeliveryQueue() {
|
||||
return new Queue<OutboundDeliveryJob>(OUTBOUND_DELIVERY_QUEUE_NAME, {
|
||||
connection: getRedis(),
|
||||
return new Queue<OutboundDeliveryJob, unknown, "deliver">(OUTBOUND_DELIVERY_QUEUE_NAME, {
|
||||
connection: redisConnectionFromEnv(),
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: { count: 1000 },
|
||||
removeOnFail: { count: 5000 },
|
||||
@@ -60,6 +73,7 @@ export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?:
|
||||
const endpoint = ensureHttpUrl(input.endpoint);
|
||||
const q = outboundDeliveryQueue();
|
||||
|
||||
const payload = (input.payload ?? null) as Prisma.InputJsonValue;
|
||||
// Keep source message in pending before actual send starts.
|
||||
await prisma.omniMessage.update({
|
||||
where: { id: input.omniMessageId },
|
||||
@@ -75,7 +89,7 @@ export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?:
|
||||
method: input.method ?? "POST",
|
||||
channel: input.channel ?? null,
|
||||
provider: input.provider ?? null,
|
||||
payload: input.payload,
|
||||
payload,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -90,7 +104,7 @@ export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?:
|
||||
}
|
||||
|
||||
export function startOutboundDeliveryWorker() {
|
||||
return new Worker<OutboundDeliveryJob>(
|
||||
return new Worker<OutboundDeliveryJob, unknown, "deliver">(
|
||||
OUTBOUND_DELIVERY_QUEUE_NAME,
|
||||
async (job) => {
|
||||
const msg = await prisma.omniMessage.findUnique({
|
||||
@@ -112,12 +126,13 @@ export function startOutboundDeliveryWorker() {
|
||||
...(job.data.headers ?? {}),
|
||||
};
|
||||
|
||||
const requestPayload = (job.data.payload ?? null) as Prisma.InputJsonValue;
|
||||
const requestStartedAt = new Date().toISOString();
|
||||
try {
|
||||
const response = await fetch(endpoint, {
|
||||
method,
|
||||
headers,
|
||||
body: JSON.stringify(job.data.payload ?? {}),
|
||||
body: JSON.stringify(requestPayload ?? {}),
|
||||
signal: AbortSignal.timeout(timeoutMs),
|
||||
});
|
||||
|
||||
@@ -152,7 +167,7 @@ export function startOutboundDeliveryWorker() {
|
||||
channel: job.data.channel ?? null,
|
||||
provider: job.data.provider ?? null,
|
||||
startedAt: requestStartedAt,
|
||||
payload: job.data.payload ?? null,
|
||||
payload: requestPayload,
|
||||
},
|
||||
deliveryResponse: {
|
||||
status: response.status,
|
||||
@@ -182,7 +197,7 @@ export function startOutboundDeliveryWorker() {
|
||||
channel: job.data.channel ?? null,
|
||||
provider: job.data.provider ?? null,
|
||||
startedAt: requestStartedAt,
|
||||
payload: job.data.payload ?? null,
|
||||
payload: requestPayload,
|
||||
},
|
||||
deliveryError: {
|
||||
message: compactError(error),
|
||||
@@ -195,6 +210,6 @@ export function startOutboundDeliveryWorker() {
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ connection: getRedis() },
|
||||
{ connection: redisConnectionFromEnv() },
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user