Queue Telegram outbound on createCommunication

This commit is contained in:
Ruslan Bakiev
2026-02-23 08:45:23 +07:00
parent 05324de216
commit 2f719219c4

View File

@@ -7,6 +7,7 @@ import { normalizePhone, verifyPassword } from "../utils/password";
import { persistChatMessage, runCrmAgentFor } from "../agent/crmAgent";
import { buildChangeSet, captureSnapshot, rollbackChangeSet, rollbackChangeSetItems } from "../utils/changeSet";
import type { ChangeSet } from "../utils/changeSet";
import { enqueueTelegramSend } from "../queues/telegramSend";
type GraphQLContext = {
auth: AuthContext | null;
@@ -584,13 +585,71 @@ async function createCommunication(auth: AuthContext | null, input: {
const occurredAt = input?.at ? new Date(input.at) : new Date();
if (Number.isNaN(occurredAt.getTime())) throw new Error("at is invalid");
const kind = input?.kind === "call" ? "CALL" : "MESSAGE";
const direction = input?.direction === "in" ? "IN" : "OUT";
const channel = toDbChannel(input?.channel ?? "Phone") as any;
const content = (input?.text ?? "").trim();
if (kind === "MESSAGE" && channel === "TELEGRAM" && direction === "OUT") {
const thread = await prisma.omniThread.findFirst({
where: {
teamId: ctx.teamId,
contactId: contact.id,
channel: "TELEGRAM",
},
orderBy: { updatedAt: "desc" },
select: { id: true },
});
if (!thread) {
throw new Error("telegram thread not found for contact");
}
const omniMessage = await prisma.omniMessage.create({
data: {
teamId: ctx.teamId,
contactId: contact.id,
threadId: thread.id,
direction: "OUT",
channel: "TELEGRAM",
status: "PENDING",
text: content,
providerMessageId: null,
providerUpdateId: null,
rawJson: {
source: "graphql.createCommunication",
enqueuedAt: new Date().toISOString(),
},
occurredAt,
},
select: { id: true },
});
try {
await enqueueTelegramSend({ omniMessageId: omniMessage.id });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await prisma.omniMessage.update({
where: { id: omniMessage.id },
data: {
status: "FAILED",
rawJson: {
source: "graphql.createCommunication",
deliveryError: { message },
failedAt: new Date().toISOString(),
},
},
}).catch(() => undefined);
throw new Error(`telegram enqueue failed: ${message}`);
}
}
const created = await prisma.contactMessage.create({
data: {
contactId: contact.id,
kind: input?.kind === "call" ? "CALL" : "MESSAGE",
direction: input?.direction === "in" ? "IN" : "OUT",
channel: toDbChannel(input?.channel ?? "Phone") as any,
content: (input?.text ?? "").trim(),
kind,
direction,
channel,
content,
durationSec: typeof input?.durationSec === "number" ? input.durationSec : null,
transcriptJson: Array.isArray(input?.transcript) ? input.transcript : undefined,
occurredAt,
@@ -602,7 +661,6 @@ async function createCommunication(auth: AuthContext | null, input: {
async function createWorkspaceDocument(auth: AuthContext | null, input: {
title: string;
type?: string;
owner?: string;
scope: string;
summary: string;
@@ -614,9 +672,6 @@ async function createWorkspaceDocument(auth: AuthContext | null, input: {
const summary = String(input?.summary ?? "").trim();
const body = String(input?.body ?? "").trim();
const owner = String(input?.owner ?? "").trim() || "Workspace";
const typeRaw = String(input?.type ?? "Template").trim();
const allowedTypes = new Set(["Regulation", "Playbook", "Policy", "Template"]);
const type = allowedTypes.has(typeRaw) ? typeRaw : "Template";
if (!title) throw new Error("title is required");
if (!scope) throw new Error("scope is required");
@@ -626,7 +681,7 @@ async function createWorkspaceDocument(auth: AuthContext | null, input: {
data: {
teamId: ctx.teamId,
title,
type: type as any,
type: "Template",
owner,
scope,
summary,
@@ -1045,7 +1100,6 @@ export const crmGraphqlSchema = buildSchema(`
input CreateWorkspaceDocumentInput {
title: String!
type: String
owner: String
scope: String!
summary: String!
@@ -1292,7 +1346,6 @@ export const crmGraphqlRoot = {
args: {
input: {
title: string;
type?: string;
owner?: string;
scope: string;
summary: string;