Update chat events/transcription flow and container startup fixes

This commit is contained in:
Ruslan Bakiev
2026-02-19 12:54:16 +07:00
parent 7cc86579b2
commit 3ac487c25b
27 changed files with 3888 additions and 780 deletions

View File

@@ -97,6 +97,8 @@ export async function runCrmAgentFor(
teamId: string;
userId: string;
userText: string;
requestId?: string;
conversationId?: string;
onTrace?: (event: AgentTraceEvent) => Promise<void> | void;
},
): Promise<AgentReply> {
@@ -246,29 +248,23 @@ export async function persistChatMessage(input: {
at: string;
}>;
changeSet?: ChangeSet | null;
requestId?: string;
eventType?: "user" | "trace" | "assistant" | "note";
phase?: "pending" | "running" | "final" | "error";
transient?: boolean;
teamId: string;
conversationId: string;
authorUserId?: string | null;
}) {
const hasDebugPayload = Boolean(
(input.plan && input.plan.length) ||
(input.tools && input.tools.length) ||
(input.thinking && input.thinking.length) ||
(input.toolRuns && input.toolRuns.length) ||
input.changeSet,
);
const hasStoredPayload = Boolean(input.changeSet);
const data: Prisma.ChatMessageCreateInput = {
team: { connect: { id: input.teamId } },
conversation: { connect: { id: input.conversationId } },
authorUser: input.authorUserId ? { connect: { id: input.authorUserId } } : undefined,
role: input.role,
text: input.text,
planJson: hasDebugPayload
planJson: hasStoredPayload
? ({
steps: input.plan ?? [],
tools: input.tools ?? [],
thinking: input.thinking ?? input.plan ?? [],
toolRuns: input.toolRuns ?? [],
changeSet: input.changeSet ?? null,
} as any)
: undefined,

View File

@@ -6,11 +6,23 @@ import { createReactAgent } from "@langchain/langgraph/prebuilt";
import { ChatOpenAI } from "@langchain/openai";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { getLangfuseClient } from "../utils/langfuse";
function iso(d: Date) {
return d.toISOString();
}
function cyclePrompt(userText: string, cycle: number, cycleNotes: string[], pendingCount: number) {
if (cycle === 1) return userText;
return [
"Continue solving the same user request.",
`User request: ${userText}`,
cycleNotes.length ? `Progress notes:\n- ${cycleNotes.join("\n- ")}` : "No progress notes yet.",
`Pending staged changes: ${pendingCount}.`,
"Do the next useful step. If done, produce final concise answer.",
].join("\n");
}
type GigachatTokenCache = {
token: string;
expiresAtSec: number;
@@ -322,6 +334,8 @@ export async function runLangGraphCrmAgentFor(input: {
teamId: string;
userId: string;
userText: string;
requestId?: string;
conversationId?: string;
onTrace?: (event: AgentTraceEvent) => Promise<void> | void;
}): Promise<AgentReply> {
const openrouterApiKey = (process.env.OPENROUTER_API_KEY ?? "").trim();
@@ -414,6 +428,16 @@ export async function runLangGraphCrmAgentFor(input: {
const pendingChanges: PendingChange[] = [];
async function emitTrace(event: AgentTraceEvent) {
lfTrace?.event({
name: "agent.trace",
input: {
text: event.text,
toolRun: event.toolRun ?? null,
},
metadata: {
requestId: input.requestId ?? null,
},
});
if (!input.onTrace) return;
try {
await input.onTrace(event);
@@ -544,7 +568,7 @@ export async function runLangGraphCrmAgentFor(input: {
const toolName = `crm:${raw.action}`;
const startedAt = new Date().toISOString();
toolsUsed.push(toolName);
await emitTrace({ text: `Tool started: ${toolName}` });
await emitTrace({ text: `Использую инструмент: ${toolName}` });
const executeAction = async () => {
if (raw.action === "get_snapshot") {
@@ -856,6 +880,23 @@ export async function runLangGraphCrmAgentFor(input: {
const maxCycles = Math.max(1, Math.min(Number(process.env.CF_AGENT_MAX_CYCLES ?? "3"), 8));
const cycleTimeoutMs = Math.max(5000, Math.min(Number(process.env.CF_AGENT_CYCLE_TIMEOUT_MS ?? "1200000"), 1800000));
const tracingFlag = (process.env.LANGSMITH_TRACING ?? process.env.LANGCHAIN_TRACING_V2 ?? "").trim().toLowerCase();
const tracingEnabled = tracingFlag === "1" || tracingFlag === "true" || tracingFlag === "yes";
const langfuse = getLangfuseClient();
const lfTrace = langfuse?.trace({
id: input.requestId ?? makeId("trace"),
name: "clientsflow.crm_agent_request",
userId: input.userId,
sessionId: input.conversationId ?? undefined,
input: input.userText,
metadata: {
teamId: input.teamId,
userId: input.userId,
requestId: input.requestId ?? null,
conversationId: input.conversationId ?? null,
},
tags: ["clientsflow", "crm-agent", "langgraph"],
});
let consecutiveNoProgress = 0;
let finalText = "";
const cycleNotes: string[] = [];
@@ -931,24 +972,34 @@ export async function runLangGraphCrmAgentFor(input: {
};
for (let cycle = 1; cycle <= maxCycles; cycle += 1) {
await emitTrace({ text: `Cycle ${cycle}/${maxCycles}: start` });
const userPrompt = cyclePrompt(input.userText, cycle, cycleNotes, pendingChanges.length);
const cycleSpan = lfTrace?.span({
name: "agent.cycle",
input: userPrompt,
metadata: {
cycle,
requestId: input.requestId ?? null,
},
});
await emitTrace({ text: "Анализирую задачу и текущий контекст CRM." });
const beforeRuns = toolRuns.length;
const beforeWrites = dbWrites.length;
const beforePending = pendingChanges.length;
const userPrompt =
cycle === 1
? input.userText
: [
"Continue solving the same user request.",
`User request: ${input.userText}`,
cycleNotes.length ? `Progress notes:\n- ${cycleNotes.join("\n- ")}` : "No progress notes yet.",
`Pending staged changes: ${pendingChanges.length}.`,
"Do the next useful step. If done, produce final concise answer.",
].join("\n");
let res: any;
try {
const invokeConfig: Record<string, any> = { recursionLimit: 30 };
if (tracingEnabled) {
invokeConfig.runName = "clientsflow.crm_agent_cycle";
invokeConfig.tags = ["clientsflow", "crm-agent", "langgraph"];
invokeConfig.metadata = {
teamId: input.teamId,
userId: input.userId,
requestId: input.requestId ?? null,
conversationId: input.conversationId ?? null,
cycle,
};
}
res = await Promise.race([
agent.invoke(
{
@@ -957,14 +1008,19 @@ export async function runLangGraphCrmAgentFor(input: {
{ role: "user", content: userPrompt },
],
},
{ recursionLimit: 30 },
invokeConfig,
),
new Promise((_resolve, reject) =>
setTimeout(() => reject(new Error(`Cycle timeout after ${cycleTimeoutMs}ms`)), cycleTimeoutMs),
),
]);
} catch (e: any) {
await emitTrace({ text: `Cycle ${cycle}/${maxCycles}: failed (${String(e?.message || e)})` });
await emitTrace({ text: "Один из шагов завершился ошибкой, пробую безопасный обход." });
cycleSpan?.end({
output: "error",
level: "ERROR",
statusMessage: String(e?.message ?? e ?? "unknown_error"),
});
if (!finalText) {
finalText = "Не удалось завершить задачу за отведенное время. Уточни запрос или сократи объем.";
}
@@ -978,12 +1034,23 @@ export async function runLangGraphCrmAgentFor(input: {
const progressed =
toolRuns.length > beforeRuns || dbWrites.length > beforeWrites || pendingChanges.length !== beforePending;
cycleSpan?.end({
output: parsed.text || "",
metadata: {
progressed,
toolRunsDelta: toolRuns.length - beforeRuns,
dbWritesDelta: dbWrites.length - beforeWrites,
pendingDelta: pendingChanges.length - beforePending,
},
});
if (progressed) {
cycleNotes.push(`Cycle ${cycle}: updated tools/data state.`);
}
await emitTrace({
text: `Cycle ${cycle}/${maxCycles}: ${progressed ? "progress" : "no progress"} · pending=${pendingChanges.length}`,
text: progressed
? "Продвигаюсь по задаче и обновляю рабочий набор изменений."
: "Промежуточный шаг не дал прогресса, проверяю следующий вариант.",
});
if (!progressed) {
@@ -994,16 +1061,28 @@ export async function runLangGraphCrmAgentFor(input: {
const done = (!progressed && cycle > 1) || cycle === maxCycles;
if (done) {
await emitTrace({ text: `Cycle ${cycle}/${maxCycles}: done` });
await emitTrace({ text: "Формирую итоговый ответ." });
break;
}
if (consecutiveNoProgress >= 2) {
await emitTrace({ text: `Cycle ${cycle}/${maxCycles}: stopped (no progress)` });
await emitTrace({ text: "Останавливаюсь, чтобы не крутиться в пустом цикле." });
break;
}
}
lfTrace?.update({
output: finalText || null,
metadata: {
toolsUsedCount: toolsUsed.length,
toolRunsCount: toolRuns.length,
dbWritesCount: dbWrites.length,
pendingChangesCount: pendingChanges.length,
maxCycles,
},
});
void langfuse?.flushAsync().catch(() => {});
if (!finalText) {
throw new Error("Model returned empty response");
}

View File

@@ -0,0 +1,62 @@
import { readBody } from "h3";
import { getAuthContext } from "../../../utils/auth";
import { prisma } from "../../../utils/prisma";
import { enqueueOutboundDelivery } from "../../../queues/outboundDelivery";
type EnqueueBody = {
omniMessageId?: string;
endpoint?: string;
method?: "POST" | "PUT" | "PATCH";
headers?: Record<string, string>;
payload?: unknown;
timeoutMs?: number;
provider?: string;
channel?: string;
attempts?: number;
};
export default defineEventHandler(async (event) => {
const auth = await getAuthContext(event);
const body = await readBody<EnqueueBody>(event);
const omniMessageId = String(body?.omniMessageId ?? "").trim();
const endpoint = String(body?.endpoint ?? "").trim();
if (!omniMessageId) {
throw createError({ statusCode: 400, statusMessage: "omniMessageId is required" });
}
if (!endpoint) {
throw createError({ statusCode: 400, statusMessage: "endpoint is required" });
}
const msg = await prisma.omniMessage.findFirst({
where: { id: omniMessageId, teamId: auth.teamId },
select: { id: true },
});
if (!msg) {
throw createError({ statusCode: 404, statusMessage: "omni message not found" });
}
const attempts = Math.max(1, Math.min(Number(body?.attempts ?? 12), 50));
const job = await enqueueOutboundDelivery(
{
omniMessageId,
endpoint,
method: body?.method ?? "POST",
headers: body?.headers ?? {},
payload: body?.payload ?? {},
timeoutMs: body?.timeoutMs,
provider: body?.provider ?? undefined,
channel: body?.channel ?? undefined,
},
{
attempts,
},
);
return {
ok: true,
queue: "omni-outbound",
jobId: job.id,
omniMessageId,
};
});

View File

@@ -0,0 +1,32 @@
import { readBody } from "h3";
import { getAuthContext } from "../../../utils/auth";
import { prisma } from "../../../utils/prisma";
import { enqueueTelegramSend } from "../../../queues/telegramSend";
export default defineEventHandler(async (event) => {
const auth = await getAuthContext(event);
const body = await readBody<{ omniMessageId?: string; attempts?: number }>(event);
const omniMessageId = String(body?.omniMessageId ?? "").trim();
if (!omniMessageId) {
throw createError({ statusCode: 400, statusMessage: "omniMessageId is required" });
}
const msg = await prisma.omniMessage.findFirst({
where: { id: omniMessageId, teamId: auth.teamId, channel: "TELEGRAM", direction: "OUT" },
select: { id: true },
});
if (!msg) {
throw createError({ statusCode: 404, statusMessage: "telegram outbound message not found" });
}
const attempts = Math.max(1, Math.min(Number(body?.attempts ?? 12), 50));
const job = await enqueueTelegramSend({ omniMessageId }, { attempts });
return {
ok: true,
queue: "omni-outbound",
jobId: job.id,
omniMessageId,
};
});

View File

@@ -0,0 +1,130 @@
import { readBody } from "h3";
import { createUIMessageStream, createUIMessageStreamResponse } from "ai";
import { getAuthContext } from "../utils/auth";
import { prisma } from "../utils/prisma";
import { buildChangeSet, captureSnapshot } from "../utils/changeSet";
import { persistChatMessage, runCrmAgentFor, type AgentTraceEvent } from "../agent/crmAgent";
function extractMessageText(message: any): string {
if (!message || !Array.isArray(message.parts)) return "";
return message.parts
.filter((part: any) => part?.type === "text" && typeof part.text === "string")
.map((part: any) => part.text)
.join("")
.trim();
}
function getLastUserText(messages: any[]): string {
for (let i = messages.length - 1; i >= 0; i -= 1) {
const message = messages[i];
if (message?.role !== "user") continue;
const text = extractMessageText(message);
if (text) return text;
}
return "";
}
function humanizeTraceText(trace: AgentTraceEvent): string {
if (trace.toolRun?.name) {
return `Использую инструмент: ${trace.toolRun.name}`;
}
const text = (trace.text ?? "").trim();
if (!text) return "Агент работает с данными CRM.";
if (text.toLowerCase().includes("ошиб")) return "Возникла ошибка шага, пробую другой путь.";
if (text.toLowerCase().includes("итог")) return "Готовлю финальный ответ.";
return text;
}
export default defineEventHandler(async (event) => {
const auth = await getAuthContext(event);
const body = await readBody<{ messages?: any[] }>(event);
const messages = Array.isArray(body?.messages) ? body.messages : [];
const userText = getLastUserText(messages);
if (!userText) {
throw createError({ statusCode: 400, statusMessage: "Last user message is required" });
}
const requestId = `req_${Date.now()}_${Math.floor(Math.random() * 1_000_000)}`;
const stream = createUIMessageStream({
execute: async ({ writer }) => {
const textId = `text-${Date.now()}`;
writer.write({ type: "start" });
try {
const snapshotBefore = await captureSnapshot(prisma, auth.teamId);
await persistChatMessage({
teamId: auth.teamId,
conversationId: auth.conversationId,
authorUserId: auth.userId,
role: "USER",
text: userText,
requestId,
eventType: "user",
phase: "final",
transient: false,
});
const reply = await runCrmAgentFor({
teamId: auth.teamId,
userId: auth.userId,
userText,
requestId,
conversationId: auth.conversationId,
onTrace: async (trace: AgentTraceEvent) => {
writer.write({
type: "data-agent-log",
data: {
requestId,
at: new Date().toISOString(),
text: humanizeTraceText(trace),
},
});
},
});
const snapshotAfter = await captureSnapshot(prisma, auth.teamId);
const changeSet = buildChangeSet(snapshotBefore, snapshotAfter);
await persistChatMessage({
teamId: auth.teamId,
conversationId: auth.conversationId,
authorUserId: null,
role: "ASSISTANT",
text: reply.text,
requestId,
eventType: "assistant",
phase: "final",
transient: false,
changeSet,
});
writer.write({ type: "text-start", id: textId });
writer.write({ type: "text-delta", id: textId, delta: reply.text });
writer.write({ type: "text-end", id: textId });
writer.write({ type: "finish", finishReason: "stop" });
} catch (error: any) {
writer.write({
type: "data-agent-log",
data: {
requestId,
at: new Date().toISOString(),
text: "Ошибка выполнения агентского цикла.",
},
});
writer.write({ type: "text-start", id: textId });
writer.write({
type: "text-delta",
id: textId,
delta: `Не удалось завершить задачу: ${String(error?.message ?? "unknown error")}`,
});
writer.write({ type: "text-end", id: textId });
writer.write({ type: "finish", finishReason: "stop" });
}
},
});
return createUIMessageStreamResponse({ stream });
});

View File

@@ -0,0 +1,62 @@
import { readBody } from "h3";
import { getAuthContext } from "../utils/auth";
import { transcribeWithWhisper } from "../utils/whisper";
type TranscribeBody = {
audioBase64?: string;
sampleRate?: number;
language?: string;
};
function decodeBase64Pcm16(audioBase64: string) {
const pcmBuffer = Buffer.from(audioBase64, "base64");
if (pcmBuffer.length < 2) return new Float32Array();
const sampleCount = Math.floor(pcmBuffer.length / 2);
const out = new Float32Array(sampleCount);
for (let i = 0; i < sampleCount; i += 1) {
const lo = pcmBuffer[i * 2]!;
const hi = pcmBuffer[i * 2 + 1]!;
const int16 = (hi << 8) | lo;
const signed = int16 >= 0x8000 ? int16 - 0x10000 : int16;
out[i] = signed / 32768;
}
return out;
}
export default defineEventHandler(async (event) => {
await getAuthContext(event);
const body = await readBody<TranscribeBody>(event);
const audioBase64 = String(body?.audioBase64 ?? "").trim();
const sampleRateRaw = Number(body?.sampleRate ?? 0);
const language = String(body?.language ?? "").trim() || undefined;
if (!audioBase64) {
throw createError({ statusCode: 400, statusMessage: "audioBase64 is required" });
}
if (!Number.isFinite(sampleRateRaw) || sampleRateRaw < 8000 || sampleRateRaw > 48000) {
throw createError({ statusCode: 400, statusMessage: "sampleRate must be between 8000 and 48000" });
}
const samples = decodeBase64Pcm16(audioBase64);
if (!samples.length) {
throw createError({ statusCode: 400, statusMessage: "Audio is empty" });
}
const maxSamples = Math.floor(sampleRateRaw * 120);
if (samples.length > maxSamples) {
throw createError({ statusCode: 413, statusMessage: "Audio is too long (max 120s)" });
}
const text = await transcribeWithWhisper({
samples,
sampleRate: sampleRateRaw,
language,
});
return { text };
});

View File

@@ -5,7 +5,6 @@ import { clearAuthSession, setSession } from "../utils/auth";
import { prisma } from "../utils/prisma";
import { normalizePhone, verifyPassword } from "../utils/password";
import { persistChatMessage, runCrmAgentFor } from "../agent/crmAgent";
import type { AgentTraceEvent } from "../agent/crmAgent";
import { buildChangeSet, captureSnapshot, rollbackChangeSet } from "../utils/changeSet";
import type { ChangeSet } from "../utils/changeSet";
@@ -210,6 +209,55 @@ async function selectChatConversation(auth: AuthContext | null, event: H3Event,
return { ok: true };
}
async function archiveChatConversation(auth: AuthContext | null, event: H3Event, id: string) {
const ctx = requireAuth(auth);
const convId = (id ?? "").trim();
if (!convId) throw new Error("id is required");
const conversation = await prisma.chatConversation.findFirst({
where: {
id: convId,
teamId: ctx.teamId,
createdByUserId: ctx.userId,
},
select: { id: true },
});
if (!conversation) throw new Error("conversation not found");
const nextConversationId = await prisma.$transaction(async (tx) => {
await tx.chatConversation.delete({ where: { id: conversation.id } });
if (ctx.conversationId !== conversation.id) {
return ctx.conversationId;
}
const fallback = await tx.chatConversation.findFirst({
where: { teamId: ctx.teamId, createdByUserId: ctx.userId },
orderBy: { updatedAt: "desc" },
select: { id: true },
});
if (fallback) {
return fallback.id;
}
const created = await tx.chatConversation.create({
data: { teamId: ctx.teamId, createdByUserId: ctx.userId, title: "Pilot" },
select: { id: true },
});
return created.id;
});
setSession(event, {
teamId: ctx.teamId,
userId: ctx.userId,
conversationId: nextConversationId,
});
return { ok: true };
}
async function getChatMessages(auth: AuthContext | null) {
const ctx = requireAuth(auth);
const items = await prisma.chatMessage.findMany({
@@ -219,25 +267,18 @@ async function getChatMessages(auth: AuthContext | null) {
});
return items.map((m) => {
const debug = (m.planJson as any) ?? {};
const cs = getChangeSetFromPlanJson(m.planJson);
return {
id: m.id,
role: m.role === "USER" ? "user" : m.role === "ASSISTANT" ? "assistant" : "system",
text: m.text,
thinking: Array.isArray(debug.thinking) ? (debug.thinking as string[]) : [],
tools: Array.isArray(debug.tools) ? (debug.tools as string[]) : [],
toolRuns: Array.isArray(debug.toolRuns)
? (debug.toolRuns as any[])
.filter((t) => t && typeof t === "object")
.map((t: any) => ({
name: String(t.name ?? "crm:unknown"),
status: t.status === "error" ? "error" : "ok",
input: String(t.input ?? ""),
output: String(t.output ?? ""),
at: t.at ? String(t.at) : m.createdAt.toISOString(),
}))
: [],
requestId: null,
eventType: null,
phase: null,
transient: null,
thinking: [],
tools: [],
toolRuns: [],
changeSetId: cs?.id ?? null,
changeStatus: cs?.status ?? null,
changeSummary: cs?.summary ?? null,
@@ -292,7 +333,10 @@ async function getDashboard(auth: AuthContext | null) {
}),
prisma.deal.findMany({
where: { teamId: ctx.teamId },
include: { contact: { select: { name: true, company: true } } },
include: {
contact: { select: { name: true, company: true } },
steps: { orderBy: [{ order: "asc" }, { createdAt: "asc" }] },
},
orderBy: { updatedAt: "desc" },
take: 500,
}),
@@ -366,6 +410,16 @@ async function getDashboard(auth: AuthContext | null) {
amount: d.amount ? String(d.amount) : "",
nextStep: d.nextStep ?? "",
summary: d.summary ?? "",
currentStepId: d.currentStepId ?? "",
steps: d.steps.map((step) => ({
id: step.id,
title: step.title,
description: step.description ?? "",
status: step.status,
dueAt: step.dueAt?.toISOString() ?? "",
order: step.order,
completedAt: step.completedAt?.toISOString() ?? "",
})),
}));
const feed = feedRaw.map((c) => ({
@@ -596,6 +650,7 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
const ctx = requireAuth(auth);
const text = (textInput ?? "").trim();
if (!text) throw new Error("text is required");
const requestId = `req_${Date.now()}_${Math.floor(Math.random() * 1_000_000)}`;
const snapshotBefore = await captureSnapshot(prisma, ctx.teamId);
@@ -605,24 +660,19 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
authorUserId: ctx.userId,
role: "USER",
text,
requestId,
eventType: "user",
phase: "final",
transient: false,
});
const reply = await runCrmAgentFor({
teamId: ctx.teamId,
userId: ctx.userId,
userText: text,
onTrace: async (event: AgentTraceEvent) => {
await persistChatMessage({
teamId: ctx.teamId,
conversationId: ctx.conversationId,
authorUserId: null,
role: "SYSTEM",
text: event.text,
thinking: [],
tools: event.toolRun ? [event.toolRun.name] : [],
toolRuns: event.toolRun ? [event.toolRun] : [],
});
},
requestId,
conversationId: ctx.conversationId,
onTrace: async () => {},
});
const snapshotAfter = await captureSnapshot(prisma, ctx.teamId);
@@ -634,9 +684,10 @@ async function sendPilotMessage(auth: AuthContext | null, textInput: string) {
authorUserId: null,
role: "ASSISTANT",
text: reply.text,
thinking: reply.thinking ?? [],
tools: reply.tools,
toolRuns: reply.toolRuns ?? [],
requestId,
eventType: "assistant",
phase: "final",
transient: false,
changeSet,
});
@@ -654,9 +705,6 @@ async function logPilotNote(auth: AuthContext | null, textInput: string) {
authorUserId: null,
role: "ASSISTANT",
text,
thinking: [],
tools: [],
toolRuns: [],
});
return { ok: true };
@@ -675,6 +723,7 @@ export const crmGraphqlSchema = buildSchema(`
logout: MutationResult!
createChatConversation(title: String): Conversation!
selectChatConversation(id: ID!): MutationResult!
archiveChatConversation(id: ID!): MutationResult!
sendPilotMessage(text: String!): MutationResult!
confirmLatestChangeSet: MutationResult!
rollbackLatestChangeSet: MutationResult!
@@ -743,6 +792,10 @@ export const crmGraphqlSchema = buildSchema(`
id: ID!
role: String!
text: String!
requestId: String
eventType: String
phase: String
transient: Boolean
thinking: [String!]!
tools: [String!]!
toolRuns: [PilotToolRun!]!
@@ -822,6 +875,18 @@ export const crmGraphqlSchema = buildSchema(`
amount: String!
nextStep: String!
summary: String!
currentStepId: String!
steps: [DealStep!]!
}
type DealStep {
id: ID!
title: String!
description: String!
status: String!
dueAt: String!
order: Int!
completedAt: String!
}
type FeedCard {
@@ -878,6 +943,9 @@ export const crmGraphqlRoot = {
selectChatConversation: async (args: { id: string }, context: GraphQLContext) =>
selectChatConversation(context.auth, context.event, args.id),
archiveChatConversation: async (args: { id: string }, context: GraphQLContext) =>
archiveChatConversation(context.auth, context.event, args.id),
sendPilotMessage: async (args: { text: string }, context: GraphQLContext) =>
sendPilotMessage(context.auth, args.text),

View File

@@ -0,0 +1,200 @@
import { Queue, Worker, type JobsOptions } from "bullmq";
import { prisma } from "../utils/prisma";
import { getRedis } from "../utils/redis";
export const OUTBOUND_DELIVERY_QUEUE_NAME = "omni-outbound";
export type OutboundDeliveryJob = {
omniMessageId: string;
endpoint: string;
method?: "POST" | "PUT" | "PATCH";
headers?: Record<string, string>;
payload: unknown;
timeoutMs?: number;
channel?: string;
provider?: string;
};
function ensureHttpUrl(value: string) {
const raw = (value ?? "").trim();
if (!raw) throw new Error("endpoint is required");
const parsed = new URL(raw);
if (parsed.protocol !== "http:" && parsed.protocol !== "https:") {
throw new Error(`Unsupported endpoint protocol: ${parsed.protocol}`);
}
return parsed.toString();
}
function compactError(error: unknown) {
if (!error) return "unknown_error";
if (typeof error === "string") return error;
const anyErr = error as any;
return String(anyErr?.message ?? anyErr);
}
function extractProviderMessageId(body: unknown): string | null {
const obj = body as any;
if (!obj || typeof obj !== "object") return null;
const candidate =
obj?.message_id ??
obj?.messageId ??
obj?.id ??
obj?.result?.message_id ??
obj?.result?.id ??
null;
if (candidate == null) return null;
return String(candidate);
}
export function outboundDeliveryQueue() {
return new Queue<OutboundDeliveryJob>(OUTBOUND_DELIVERY_QUEUE_NAME, {
connection: getRedis(),
defaultJobOptions: {
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
},
});
}
export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?: JobsOptions) {
const endpoint = ensureHttpUrl(input.endpoint);
const q = outboundDeliveryQueue();
// Keep source message in pending before actual send starts.
await prisma.omniMessage.update({
where: { id: input.omniMessageId },
data: {
status: "PENDING",
rawJson: {
queue: {
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
enqueuedAt: new Date().toISOString(),
},
deliveryRequest: {
endpoint,
method: input.method ?? "POST",
channel: input.channel ?? null,
provider: input.provider ?? null,
payload: input.payload,
},
},
},
});
return q.add("deliver", { ...input, endpoint }, {
jobId: `omni:${input.omniMessageId}`,
attempts: 12,
backoff: { type: "exponential", delay: 1000 },
...opts,
});
}
export function startOutboundDeliveryWorker() {
return new Worker<OutboundDeliveryJob>(
OUTBOUND_DELIVERY_QUEUE_NAME,
async (job) => {
const msg = await prisma.omniMessage.findUnique({
where: { id: job.data.omniMessageId },
include: { thread: true },
});
if (!msg) return;
// Idempotency: if already sent/delivered, do not resend.
if ((msg.status === "SENT" || msg.status === "DELIVERED" || msg.status === "READ") && msg.providerMessageId) {
return;
}
const endpoint = ensureHttpUrl(job.data.endpoint);
const timeoutMs = Math.max(1000, Math.min(job.data.timeoutMs ?? 20000, 120000));
const method = job.data.method ?? "POST";
const headers: Record<string, string> = {
"content-type": "application/json",
...(job.data.headers ?? {}),
};
const requestStartedAt = new Date().toISOString();
try {
const response = await fetch(endpoint, {
method,
headers,
body: JSON.stringify(job.data.payload ?? {}),
signal: AbortSignal.timeout(timeoutMs),
});
const text = await response.text();
const responseBody = (() => {
try {
return JSON.parse(text);
} catch {
return text;
}
})();
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${typeof responseBody === "string" ? responseBody : JSON.stringify(responseBody)}`);
}
const providerMessageId = extractProviderMessageId(responseBody);
await prisma.omniMessage.update({
where: { id: msg.id },
data: {
status: "SENT",
providerMessageId,
rawJson: {
queue: {
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
completedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1,
},
deliveryRequest: {
endpoint,
method,
channel: job.data.channel ?? null,
provider: job.data.provider ?? null,
startedAt: requestStartedAt,
payload: job.data.payload ?? null,
},
deliveryResponse: {
status: response.status,
body: responseBody,
},
},
},
});
} catch (error) {
const isLastAttempt =
typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts;
if (isLastAttempt) {
await prisma.omniMessage.update({
where: { id: msg.id },
data: {
status: "FAILED",
rawJson: {
queue: {
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
failedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1,
},
deliveryRequest: {
endpoint,
method,
channel: job.data.channel ?? null,
provider: job.data.provider ?? null,
startedAt: requestStartedAt,
payload: job.data.payload ?? null,
},
deliveryError: {
message: compactError(error),
},
},
},
});
}
throw error;
}
},
{ connection: getRedis() },
);
}

View File

@@ -1,92 +1,43 @@
import { Queue, Worker, JobsOptions } from "bullmq";
import { getRedis } from "../utils/redis";
import type { JobsOptions } from "bullmq";
import { prisma } from "../utils/prisma";
import { telegramBotApi } from "../utils/telegram";
export const TELEGRAM_SEND_QUEUE_NAME = "telegram:send";
import { telegramApiBase, requireTelegramBotToken } from "../utils/telegram";
import { enqueueOutboundDelivery, startOutboundDeliveryWorker } from "./outboundDelivery";
type TelegramSendJob = {
omniMessageId: string;
};
export function telegramSendQueue() {
return new Queue<TelegramSendJob>(TELEGRAM_SEND_QUEUE_NAME, {
connection: getRedis(),
defaultJobOptions: {
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
},
});
}
export async function enqueueTelegramSend(input: TelegramSendJob, opts?: JobsOptions) {
const q = telegramSendQueue();
return q.add("send", input, {
jobId: input.omniMessageId, // idempotency
attempts: 10,
backoff: { type: "exponential", delay: 1000 },
...opts,
const msg = await prisma.omniMessage.findUnique({
where: { id: input.omniMessageId },
include: { thread: true },
});
}
if (!msg) throw new Error(`omni message not found: ${input.omniMessageId}`);
if (msg.channel !== "TELEGRAM" || msg.direction !== "OUT") {
throw new Error(`Invalid omni message for telegram send: ${msg.id}`);
}
export function startTelegramSendWorker() {
return new Worker<TelegramSendJob>(
TELEGRAM_SEND_QUEUE_NAME,
async (job) => {
const msg = await prisma.omniMessage.findUnique({
where: { id: job.data.omniMessageId },
include: { thread: true },
});
if (!msg) return;
const token = requireTelegramBotToken();
const endpoint = `${telegramApiBase()}/bot${token}/sendMessage`;
const payload = {
chat_id: msg.thread.externalChatId,
text: msg.text,
...(msg.thread.businessConnectionId ? { business_connection_id: msg.thread.businessConnectionId } : {}),
};
// Idempotency: if we already sent it, don't send twice.
if (msg.status === "SENT" && msg.providerMessageId) return;
if (msg.channel !== "TELEGRAM" || msg.direction !== "OUT") {
throw new Error(`Invalid omni message for telegram send: ${msg.id}`);
}
const thread = msg.thread;
const chatId = thread.externalChatId;
const businessConnectionId = thread.businessConnectionId || undefined;
try {
const result = await telegramBotApi<any>("sendMessage", {
chat_id: chatId,
text: msg.text,
...(businessConnectionId ? { business_connection_id: businessConnectionId } : {}),
});
const providerMessageId = result?.message_id != null ? String(result.message_id) : null;
await prisma.omniMessage.update({
where: { id: msg.id },
data: {
status: "SENT",
providerMessageId: providerMessageId,
rawJson: result,
},
});
} catch (e: any) {
const isLastAttempt =
typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts;
if (isLastAttempt) {
await prisma.omniMessage.update({
where: { id: msg.id },
data: {
status: "FAILED",
rawJson: {
error: String(e?.message || e),
attemptsMade: job.attemptsMade + 1,
},
},
});
}
throw e;
}
return enqueueOutboundDelivery(
{
omniMessageId: msg.id,
endpoint,
method: "POST",
payload,
provider: "telegram_business",
channel: "TELEGRAM",
},
{ connection: getRedis() },
opts,
);
}
export function startTelegramSendWorker() {
return startOutboundDeliveryWorker();
}

View File

@@ -0,0 +1,35 @@
import { startOutboundDeliveryWorker } from "./outboundDelivery";
import { prisma } from "../utils/prisma";
import { getRedis } from "../utils/redis";
const worker = startOutboundDeliveryWorker();
console.log("[delivery-worker] started queue omni:outbound");
async function shutdown(signal: string) {
console.log(`[delivery-worker] shutting down by ${signal}`);
try {
await worker.close();
} catch {
// ignore shutdown errors
}
try {
const redis = getRedis();
await redis.quit();
} catch {
// ignore shutdown errors
}
try {
await prisma.$disconnect();
} catch {
// ignore shutdown errors
}
process.exit(0);
}
process.on("SIGINT", () => {
void shutdown("SIGINT");
});
process.on("SIGTERM", () => {
void shutdown("SIGTERM");
});

View File

@@ -0,0 +1,29 @@
import { Langfuse } from "langfuse";
let client: Langfuse | null = null;
function isTruthy(value: string | undefined) {
const v = (value ?? "").trim().toLowerCase();
return v === "1" || v === "true" || v === "yes" || v === "on";
}
export function isLangfuseEnabled() {
const enabledRaw = process.env.LANGFUSE_ENABLED;
if (enabledRaw && !isTruthy(enabledRaw)) return false;
return Boolean((process.env.LANGFUSE_PUBLIC_KEY ?? "").trim() && (process.env.LANGFUSE_SECRET_KEY ?? "").trim());
}
export function getLangfuseClient() {
if (!isLangfuseEnabled()) return null;
if (client) return client;
client = new Langfuse({
publicKey: (process.env.LANGFUSE_PUBLIC_KEY ?? "").trim(),
secretKey: (process.env.LANGFUSE_SECRET_KEY ?? "").trim(),
baseUrl: (process.env.LANGFUSE_BASE_URL ?? "http://langfuse-web:3000").trim(),
enabled: true,
});
return client;
}

View File

@@ -0,0 +1,53 @@
type WhisperTranscribeInput = {
samples: Float32Array;
sampleRate: number;
language?: string;
};
let whisperPipelinePromise: Promise<any> | null = null;
let transformersPromise: Promise<any> | null = null;
function getWhisperModelId() {
return (process.env.CF_WHISPER_MODEL ?? "Xenova/whisper-small").trim() || "Xenova/whisper-small";
}
function getWhisperLanguage() {
const value = (process.env.CF_WHISPER_LANGUAGE ?? "ru").trim();
return value || "ru";
}
async function getWhisperPipeline() {
if (!transformersPromise) {
transformersPromise = import("@xenova/transformers");
}
const { env, pipeline } = await transformersPromise;
if (!whisperPipelinePromise) {
env.allowRemoteModels = true;
env.allowLocalModels = true;
env.cacheDir = "/app/.data/transformers";
const modelId = getWhisperModelId();
whisperPipelinePromise = pipeline("automatic-speech-recognition", modelId);
}
return whisperPipelinePromise;
}
export async function transcribeWithWhisper(input: WhisperTranscribeInput) {
const transcriber = (await getWhisperPipeline()) as any;
const result = await transcriber(
input.samples,
{
sampling_rate: input.sampleRate,
language: (input.language ?? getWhisperLanguage()) || "ru",
task: "transcribe",
chunk_length_s: 20,
stride_length_s: 5,
return_timestamps: false,
},
);
const text = String((result as any)?.text ?? "").trim();
return text;
}