refactor chat delivery to graphql + hatchet services

This commit is contained in:
Ruslan Bakiev
2026-03-08 18:55:58 +07:00
parent fe4bd59248
commit 7d1bed0d67
61 changed files with 5007 additions and 5004 deletions

View File

@@ -1,60 +1,6 @@
import { readBody } from "h3";
import { getAuthContext } from "../../../utils/auth";
import { prisma } from "../../../utils/prisma";
import { enqueueOutboundDelivery } from "../../../queues/outboundDelivery";
type EnqueueBody = {
omniMessageId?: string;
endpoint?: string;
method?: "POST" | "PUT" | "PATCH";
headers?: Record<string, string>;
payload?: unknown;
provider?: string;
channel?: string;
attempts?: number;
};
export default defineEventHandler(async (event) => {
const auth = await getAuthContext(event);
const body = await readBody<EnqueueBody>(event);
const omniMessageId = String(body?.omniMessageId ?? "").trim();
const endpoint = String(body?.endpoint ?? "").trim();
if (!omniMessageId) {
throw createError({ statusCode: 400, statusMessage: "omniMessageId is required" });
}
if (!endpoint) {
throw createError({ statusCode: 400, statusMessage: "endpoint is required" });
}
const msg = await prisma.omniMessage.findFirst({
where: { id: omniMessageId, teamId: auth.teamId },
select: { id: true },
throw createError({
statusCode: 410,
statusMessage: "Legacy delivery enqueue is disabled. Use backend GraphQL requestTelegramOutbound.",
});
if (!msg) {
throw createError({ statusCode: 404, statusMessage: "omni message not found" });
}
const attempts = Math.max(1, Math.min(Number(body?.attempts ?? 12), 50));
const job = await enqueueOutboundDelivery(
{
omniMessageId,
endpoint,
method: body?.method ?? "POST",
headers: body?.headers ?? {},
payload: body?.payload ?? {},
provider: body?.provider ?? undefined,
channel: body?.channel ?? undefined,
},
{
attempts,
},
);
return {
ok: true,
queue: process.env.SENDER_FLOW_QUEUE_NAME || process.env.OUTBOUND_DELIVERY_QUEUE_NAME || "sender.flow",
jobId: job.id,
omniMessageId,
};
});

View File

@@ -1,11 +1,79 @@
import { readBody } from "h3";
import { getAuthContext } from "../../../utils/auth";
import { prisma } from "../../../utils/prisma";
import { enqueueTelegramSend } from "../../../queues/telegramSend";
type BackendGraphqlResponse<T> = {
data?: T;
errors?: Array<{ message?: string }>;
};
function asString(value: unknown) {
if (typeof value !== "string") return null;
const v = value.trim();
return v || null;
}
async function requestTelegramOutbound(input: {
omniMessageId: string;
chatId: string;
text: string;
businessConnectionId?: string | null;
}) {
type Out = {
requestTelegramOutbound: {
ok: boolean;
message: string;
runId?: string | null;
};
};
const url = asString(process.env.BACKEND_GRAPHQL_URL);
if (!url) throw new Error("BACKEND_GRAPHQL_URL is required");
const headers: Record<string, string> = {
"content-type": "application/json",
};
const secret = asString(process.env.BACKEND_GRAPHQL_SHARED_SECRET);
if (secret) {
headers["x-graphql-secret"] = secret;
}
const query = `mutation RequestTelegramOutbound($input: TelegramOutboundTaskInput!) {
requestTelegramOutbound(input: $input) {
ok
message
runId
}
}`;
const response = await fetch(url, {
method: "POST",
headers,
body: JSON.stringify({
operationName: "RequestTelegramOutbound",
query,
variables: { input },
}),
});
const payload = (await response.json()) as BackendGraphqlResponse<Out>;
if (!response.ok || payload.errors?.length) {
const message =
payload.errors?.map((error) => error.message).filter(Boolean).join("; ") || `HTTP ${response.status}`;
throw new Error(message);
}
const result = payload.data?.requestTelegramOutbound;
if (!result?.ok) {
throw new Error(result?.message || "requestTelegramOutbound failed");
}
return result;
}
export default defineEventHandler(async (event) => {
const auth = await getAuthContext(event);
const body = await readBody<{ omniMessageId?: string; attempts?: number }>(event);
const body = await readBody<{ omniMessageId?: string }>(event);
const omniMessageId = String(body?.omniMessageId ?? "").trim();
if (!omniMessageId) {
@@ -14,19 +82,26 @@ export default defineEventHandler(async (event) => {
const msg = await prisma.omniMessage.findFirst({
where: { id: omniMessageId, teamId: auth.teamId, channel: "TELEGRAM", direction: "OUT" },
select: { id: true },
select: {
id: true,
text: true,
thread: { select: { externalChatId: true, businessConnectionId: true } },
},
});
if (!msg) {
throw createError({ statusCode: 404, statusMessage: "telegram outbound message not found" });
}
const attempts = Math.max(1, Math.min(Number(body?.attempts ?? 12), 50));
const job = await enqueueTelegramSend({ omniMessageId }, { attempts });
const result = await requestTelegramOutbound({
omniMessageId: msg.id,
chatId: msg.thread.externalChatId,
text: msg.text,
businessConnectionId: msg.thread.businessConnectionId ?? null,
});
return {
ok: true,
queue: process.env.SENDER_FLOW_QUEUE_NAME || process.env.OUTBOUND_DELIVERY_QUEUE_NAME || "sender.flow",
jobId: job.id,
runId: result.runId ?? null,
omniMessageId,
};
});

View File

@@ -1,9 +1,4 @@
import { startTelegramSendWorker } from "../queues/telegramSend";
export default defineNitroPlugin(() => {
// Disabled by default. If you need background processing, wire it explicitly.
if (process.env.RUN_QUEUE_WORKER !== "1") return;
startTelegramSendWorker();
// Legacy BullMQ worker was removed.
// Delivery orchestration now runs in hatchet/telegram_worker service.
});

View File

@@ -8,7 +8,6 @@ import { normalizePhone, verifyPassword } from "../utils/password";
import { persistAiMessage, runCrmAgentFor } from "../agent/crmAgent";
import { buildChangeSet, captureSnapshot, rollbackChangeSet, rollbackChangeSetItems } from "../utils/changeSet";
import type { ChangeSet } from "../utils/changeSet";
import { enqueueTelegramSend } from "../queues/telegramSend";
import { datasetRoot } from "../dataset/paths";
type GraphQLContext = {
@@ -16,6 +15,11 @@ type GraphQLContext = {
event: H3Event;
};
type BackendGraphqlResponse<T> = {
data?: T;
errors?: Array<{ message?: string }>;
};
function requireAuth(auth: AuthContext | null) {
if (!auth) {
throw new Error("Unauthorized");
@@ -45,6 +49,79 @@ function asObject(value: unknown): Record<string, unknown> {
return value as Record<string, unknown>;
}
function asString(value: unknown) {
if (typeof value !== "string") return null;
const v = value.trim();
return v || null;
}
async function requestTelegramOutboundFromBackend(input: {
omniMessageId: string;
chatId: string;
text: string;
businessConnectionId?: string | null;
}) {
type Out = {
requestTelegramOutbound: {
ok: boolean;
message: string;
runId?: string | null;
};
};
const url = asString(process.env.BACKEND_GRAPHQL_URL);
if (!url) {
throw new Error("BACKEND_GRAPHQL_URL is required");
}
const headers: Record<string, string> = {
"content-type": "application/json",
};
const secret = asString(process.env.BACKEND_GRAPHQL_SHARED_SECRET);
if (secret) {
headers["x-graphql-secret"] = secret;
}
const query = `mutation RequestTelegramOutbound($input: TelegramOutboundTaskInput!) {
requestTelegramOutbound(input: $input) {
ok
message
runId
}
}`;
const response = await fetch(url, {
method: "POST",
headers,
body: JSON.stringify({
operationName: "RequestTelegramOutbound",
query,
variables: {
input: {
omniMessageId: input.omniMessageId,
chatId: input.chatId,
text: input.text,
businessConnectionId: input.businessConnectionId ?? null,
},
},
}),
});
const payload = (await response.json()) as BackendGraphqlResponse<Out>;
if (!response.ok || payload.errors?.length) {
const message = payload.errors?.map((error) => error.message).filter(Boolean).join("; ") || `HTTP ${response.status}`;
throw new Error(message);
}
const result = payload.data?.requestTelegramOutbound;
if (!result?.ok) {
throw new Error(result?.message || "requestTelegramOutbound failed");
}
return result;
}
function readNestedString(obj: Record<string, unknown>, path: string[]): string {
let current: unknown = obj;
for (const segment of path) {
@@ -1416,7 +1493,7 @@ async function createCommunication(auth: AuthContext | null, input: {
channel: "TELEGRAM",
},
orderBy: { updatedAt: "desc" },
select: { id: true, externalChatId: true, title: true },
select: { id: true, externalChatId: true, businessConnectionId: true, title: true },
});
if (!thread) {
throw new Error("telegram thread not found for contact");
@@ -1464,7 +1541,12 @@ async function createCommunication(auth: AuthContext | null, input: {
});
try {
await enqueueTelegramSend({ omniMessageId: omniMessage.id });
await requestTelegramOutboundFromBackend({
omniMessageId: omniMessage.id,
chatId: thread.externalChatId,
text: content,
businessConnectionId: thread.businessConnectionId ?? null,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const existingOmni = await prisma.omniMessage.findUnique({
@@ -1486,7 +1568,7 @@ async function createCommunication(auth: AuthContext | null, input: {
},
},
}).catch(() => undefined);
throw new Error(`telegram enqueue failed: ${message}`);
throw new Error(`telegram outbound request failed: ${message}`);
}
} else {
const existingInbox = await prisma.contactInbox.findFirst({

View File

@@ -1,243 +0,0 @@
import { Queue, Worker, type JobsOptions, type ConnectionOptions } from "bullmq";
import { Prisma } from "../generated/prisma/client";
import { prisma } from "../utils/prisma";
export const OUTBOUND_DELIVERY_QUEUE_NAME = (
process.env.SENDER_FLOW_QUEUE_NAME ||
process.env.OUTBOUND_DELIVERY_QUEUE_NAME ||
"sender.flow"
).trim();
export type OutboundDeliveryJob = {
omniMessageId: string;
endpoint: string;
method?: "POST" | "PUT" | "PATCH";
headers?: Record<string, string>;
payload: unknown;
channel?: string;
provider?: string;
};
function redisConnectionFromEnv(): ConnectionOptions {
const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim();
const parsed = new URL(raw);
return {
host: parsed.hostname,
port: parsed.port ? Number(parsed.port) : 6379,
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
db: parsed.pathname && parsed.pathname !== "/" ? Number(parsed.pathname.slice(1)) : undefined,
maxRetriesPerRequest: null,
};
}
function ensureHttpUrl(value: string) {
const raw = (value ?? "").trim();
if (!raw) throw new Error("endpoint is required");
const parsed = new URL(raw);
if (parsed.protocol !== "http:" && parsed.protocol !== "https:") {
throw new Error(`Unsupported endpoint protocol: ${parsed.protocol}`);
}
return parsed.toString();
}
function compactError(error: unknown) {
if (!error) return "unknown_error";
if (typeof error === "string") return error;
const anyErr = error as any;
return String(anyErr?.message ?? anyErr);
}
function extractProviderMessageId(body: unknown): string | null {
const obj = body as any;
if (!obj || typeof obj !== "object") return null;
const candidate =
obj?.message_id ??
obj?.messageId ??
obj?.id ??
obj?.result?.message_id ??
obj?.result?.id ??
null;
if (candidate == null) return null;
return String(candidate);
}
function asObject(value: unknown): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) return {};
return value as Record<string, unknown>;
}
export function outboundDeliveryQueue() {
return new Queue<OutboundDeliveryJob, unknown, "deliver">(OUTBOUND_DELIVERY_QUEUE_NAME, {
connection: redisConnectionFromEnv(),
defaultJobOptions: {
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
},
});
}
export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?: JobsOptions) {
const endpoint = ensureHttpUrl(input.endpoint);
const q = outboundDeliveryQueue();
const payload = (input.payload ?? null) as Prisma.InputJsonValue;
const existing = await prisma.omniMessage.findUnique({
where: { id: input.omniMessageId },
select: { rawJson: true },
});
const raw = asObject(existing?.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
// Keep source message in pending before actual send starts.
await prisma.omniMessage.update({
where: { id: input.omniMessageId },
data: {
status: "PENDING",
rawJson: {
...raw,
queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
enqueuedAt: new Date().toISOString(),
},
deliveryRequest: {
...rawDeliveryRequest,
endpoint,
method: input.method ?? "POST",
channel: input.channel ?? null,
provider: input.provider ?? null,
payload,
},
},
},
});
return q.add("deliver", { ...input, endpoint }, {
jobId: `omni-${input.omniMessageId}`,
attempts: 12,
backoff: { type: "exponential", delay: 1000 },
...opts,
});
}
export function startOutboundDeliveryWorker() {
return new Worker<OutboundDeliveryJob, unknown, "deliver">(
OUTBOUND_DELIVERY_QUEUE_NAME,
async (job) => {
const msg = await prisma.omniMessage.findUnique({
where: { id: job.data.omniMessageId },
include: { thread: true },
});
if (!msg) return;
// Idempotency: if already sent/delivered, do not resend.
if ((msg.status === "SENT" || msg.status === "DELIVERED" || msg.status === "READ") && msg.providerMessageId) {
return;
}
const endpoint = ensureHttpUrl(job.data.endpoint);
const method = job.data.method ?? "POST";
const headers: Record<string, string> = {
"content-type": "application/json",
...(job.data.headers ?? {}),
};
const requestPayload = (job.data.payload ?? null) as Prisma.InputJsonValue;
const requestStartedAt = new Date().toISOString();
try {
const response = await fetch(endpoint, {
method,
headers,
body: JSON.stringify(requestPayload ?? {}),
});
const text = await response.text();
const responseBody = (() => {
try {
return JSON.parse(text);
} catch {
return text;
}
})();
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${typeof responseBody === "string" ? responseBody : JSON.stringify(responseBody)}`);
}
const providerMessageId = extractProviderMessageId(responseBody);
const raw = asObject(msg.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
await prisma.omniMessage.update({
where: { id: msg.id },
data: {
status: "SENT",
providerMessageId,
rawJson: {
...raw,
queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
completedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1,
},
deliveryRequest: {
...rawDeliveryRequest,
endpoint,
method,
channel: job.data.channel ?? null,
provider: job.data.provider ?? null,
startedAt: requestStartedAt,
payload: requestPayload,
},
deliveryResponse: {
status: response.status,
body: responseBody,
},
},
},
});
} catch (error) {
const isLastAttempt =
typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts;
if (isLastAttempt) {
const raw = asObject(msg.rawJson);
const rawQueue = asObject(raw.queue);
const rawDeliveryRequest = asObject(raw.deliveryRequest);
await prisma.omniMessage.update({
where: { id: msg.id },
data: {
status: "FAILED",
rawJson: {
...raw,
queue: {
...rawQueue,
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
failedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1,
},
deliveryRequest: {
...rawDeliveryRequest,
endpoint,
method,
channel: job.data.channel ?? null,
provider: job.data.provider ?? null,
startedAt: requestStartedAt,
payload: requestPayload,
},
deliveryError: {
message: compactError(error),
},
},
},
});
}
throw error;
}
},
{ connection: redisConnectionFromEnv() },
);
}

View File

@@ -1,65 +0,0 @@
import type { JobsOptions } from "bullmq";
import { prisma } from "../utils/prisma";
import { telegramApiBase, requireTelegramBotToken } from "../utils/telegram";
import { enqueueOutboundDelivery, startOutboundDeliveryWorker } from "./outboundDelivery";
type TelegramSendJob = {
omniMessageId: string;
};
function asObject(value: unknown): Record<string, unknown> {
if (!value || typeof value !== "object" || Array.isArray(value)) return {};
return value as Record<string, unknown>;
}
function readNestedString(obj: Record<string, unknown>, path: string[]): string {
let current: unknown = obj;
for (const segment of path) {
if (!current || typeof current !== "object" || Array.isArray(current)) return "";
current = (current as Record<string, unknown>)[segment];
}
return typeof current === "string" ? current.trim() : "";
}
export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOptions) {
const msg = await prisma.omniMessage.findUnique({
where: { id: input.omniMessageId },
include: { thread: true },
});
if (!msg) throw new Error(`omni message not found: ${input.omniMessageId}`);
if (msg.channel !== "TELEGRAM" || msg.direction !== "OUT") {
throw new Error(`Invalid omni message for telegram send: ${msg.id}`);
}
const raw = asObject(msg.rawJson);
const text =
readNestedString(raw, ["normalized", "text"]) ||
readNestedString(raw, ["payloadNormalized", "text"]) ||
msg.text;
if (!text) {
throw new Error(`Omni message has empty text payload: ${msg.id}`);
}
const token = requireTelegramBotToken();
const endpoint = `${telegramApiBase()}/bot${token}/sendMessage`;
const payload = {
chat_id: msg.thread.externalChatId,
text,
...(msg.thread.businessConnectionId ? { business_connection_id: msg.thread.businessConnectionId } : {}),
};
return enqueueOutboundDelivery(
{
omniMessageId: msg.id,
endpoint,
method: "POST",
payload,
provider: "telegram_business",
channel: "TELEGRAM",
},
opts,
);
}
export function startTelegramSendWorker() {
return startOutboundDeliveryWorker();
}

View File

@@ -1,34 +0,0 @@
import { OUTBOUND_DELIVERY_QUEUE_NAME, startOutboundDeliveryWorker } from "./outboundDelivery";
import { prisma } from "../utils/prisma";
import { getRedis } from "../utils/redis";
const worker = startOutboundDeliveryWorker();
console.log(`[omni_outbound(legacy-in-frontend)] started queue ${OUTBOUND_DELIVERY_QUEUE_NAME}`);
async function shutdown(signal: string) {
console.log(`[omni_outbound(legacy-in-frontend)] shutting down by ${signal}`);
try {
await worker.close();
} catch {
// ignore shutdown errors
}
try {
const redis = getRedis();
await redis.quit();
} catch {
// ignore shutdown errors
}
try {
await prisma.$disconnect();
} catch {
// ignore shutdown errors
}
process.exit(0);
}
process.on("SIGINT", () => {
void shutdown("SIGINT");
});
process.on("SIGTERM", () => {
void shutdown("SIGTERM");
});

View File

@@ -1,22 +0,0 @@
import Redis from "ioredis";
declare global {
// eslint-disable-next-line no-var
var __redis: Redis | undefined;
}
export function getRedis() {
if (globalThis.__redis) return globalThis.__redis;
const url = process.env.REDIS_URL || "redis://localhost:6379";
const client = new Redis(url, {
maxRetriesPerRequest: null, // recommended for BullMQ
});
if (process.env.NODE_ENV !== "production") {
globalThis.__redis = client;
}
return client;
}