Remove hard timeouts and fallback paths in chat flow

This commit is contained in:
Ruslan Bakiev
2026-02-20 10:05:33 +07:00
parent d49b00d688
commit b3602d142e
10 changed files with 49 additions and 99 deletions

View File

@@ -361,7 +361,6 @@ let pilotWaveRecordPlugin: any = null;
let pilotWaveMicSession: { onDestroy: () => void; onEnd: () => void } | null = null;
const commCallWaveHosts = new Map<string, HTMLDivElement>();
const commCallWaveSurfers = new Map<string, any>();
const FALLBACK_CALL_AUDIO_URL = "/audio-samples/national-road-9.m4a";
const callTranscriptOpen = ref<Record<string, boolean>>({});
const callTranscriptLoading = ref<Record<string, boolean>>({});
const callTranscriptText = ref<Record<string, string>>({});
@@ -731,23 +730,15 @@ async function sendPilotText(rawText: string) {
const text = rawText.trim();
if (!text || pilotSending.value) return;
const sendTimeoutMs = 45000;
pilotSending.value = true;
pilotInput.value = "";
livePilotUserText.value = text;
livePilotAssistantText.value = "";
pilotLiveLogs.value = [];
try {
await Promise.race([
pilotChat.sendMessage({ text }),
new Promise((_, reject) => {
setTimeout(() => reject(new Error("pilot_send_timeout")), sendTimeoutMs);
}),
]);
} catch (error: any) {
if (error?.message !== "pilot_send_timeout") {
pilotInput.value = text;
}
await pilotChat.sendMessage({ text });
} catch {
pilotInput.value = text;
} finally {
const latestAssistant = [...pilotChat.messages]
.reverse()
@@ -840,8 +831,7 @@ function buildCallWavePeaks(item: CommItem, size = 320) {
}
function getCallAudioUrl(item?: CommItem) {
const direct = String(item?.audioUrl ?? "").trim();
return direct || FALLBACK_CALL_AUDIO_URL;
return String(item?.audioUrl ?? "").trim();
}
async function ensureCommCallWave(itemId: string) {
@@ -1688,7 +1678,7 @@ const selectedCommThread = computed(() =>
commThreads.value.find((thread) => thread.id === selectedCommThreadId.value),
);
const commSendChannel = ref<CommItem["channel"]>("Telegram");
const commSendChannel = ref<CommItem["channel"] | "">("");
const commPinnedOnly = ref(false);
const commDraft = ref("");
const commSending = ref(false);
@@ -1735,14 +1725,14 @@ watch(selectedCommThreadId, () => {
eventArchiveRecordingById.value = {};
eventArchiveTranscribingById.value = {};
eventArchiveMicErrorById.value = {};
const fallback = selectedCommThread.value?.channels.find((channel) => channel !== "Phone") ?? "Telegram";
commSendChannel.value = fallback;
const preferred = selectedCommThread.value?.channels.find((channel) => channel !== "Phone") ?? "";
commSendChannel.value = preferred;
});
const commSendChannelOptions = computed<CommItem["channel"][]>(() => {
if (!selectedCommThread.value) return ["Telegram"];
if (!selectedCommThread.value) return [];
const items = selectedCommThread.value.channels.filter((channel) => channel !== "Phone");
return items.length ? items : ["Telegram"];
return items;
});
const visibleThreadItems = computed(() => {
@@ -2468,8 +2458,8 @@ async function sendCommMessage() {
commSending.value = true;
try {
const fallback = selectedCommThread.value.channels.find((channel) => channel !== "Phone") ?? "Telegram";
const channel = commSendChannel.value || fallback;
const channel = commSendChannel.value;
if (!channel) return;
await gqlFetch<{ createCommunication: { ok: boolean; id: string } }>(createCommunicationMutation, {
input: {
@@ -3758,7 +3748,7 @@ async function decideFeedCard(card: FeedCard, decision: "accepted" | "rejected")
:disabled="commSending"
:title="`Channel: ${commSendChannel}`"
>
<span class="mr-1">{{ commSendChannel }}</span>
<span class="mr-1">{{ commSendChannel || "Channel" }}</span>
<svg viewBox="0 0 20 20" class="h-3.5 w-3.5 fill-current">
<path fill-rule="evenodd" clip-rule="evenodd" d="M5.22 8.22a.75.75 0 0 1 1.06 0L10 11.94l3.72-3.72a.75.75 0 1 1 1.06 1.06l-4.25 4.25a.75.75 0 0 1-1.06 0L5.22 9.28a.75.75 0 0 1 0-1.06Z" />
</svg>
@@ -3799,7 +3789,7 @@ async function decideFeedCard(card: FeedCard, decision: "accepted" | "rejected")
<button
class="btn btn-sm btn-circle border-0 bg-[#5865f2] text-white hover:bg-[#4752c4]"
:disabled="commSending || commEventSaving || !commDraft.trim()"
:disabled="commSending || commEventSaving || !commDraft.trim() || (commComposerMode === 'message' && !commSendChannel)"
:title="commComposerMode === 'message' ? `Send via ${commSendChannel}` : (commComposerMode === 'logged' ? 'Save log event' : 'Create event')"
@click="commComposerMode === 'message' ? sendCommMessage() : createCommEvent()"
>

View File

@@ -29,7 +29,7 @@ else
fi
# Wait until PostgreSQL is reachable before applying schema.
until node -e "const u=new URL(process.env.DATABASE_URL||''); const net=require('net'); const s=net.createConnection({host:u.hostname,port:Number(u.port||5432)}); s.on('connect',()=>{s.end(); process.exit(0);}); s.on('error',()=>process.exit(1)); setTimeout(()=>process.exit(1), 1000);" ; do
until node -e "const u=new URL(process.env.DATABASE_URL||''); const net=require('net'); const s=net.createConnection({host:u.hostname,port:Number(u.port||5432)}); s.on('connect',()=>{s.end(); process.exit(0);}); s.on('error',()=>process.exit(1));" ; do
echo "Waiting for PostgreSQL..."
sleep 1
done

View File

@@ -21,7 +21,7 @@ fi
npx prisma generate
# Ensure DB is reachable before the worker starts consuming jobs.
until node -e "const u=new URL(process.env.DATABASE_URL||''); const net=require('net'); const s=net.createConnection({host:u.hostname,port:Number(u.port||5432)}); s.on('connect',()=>{s.end(); process.exit(0);}); s.on('error',()=>process.exit(1)); setTimeout(()=>process.exit(1), 1000);" ; do
until node -e "const u=new URL(process.env.DATABASE_URL||''); const net=require('net'); const s=net.createConnection({host:u.hostname,port:Number(u.port||5432)}); s.on('connect',()=>{s.end(); process.exit(0);}); s.on('error',()=>process.exit(1));" ; do
echo "Waiting for PostgreSQL..."
sleep 1
done

View File

@@ -225,7 +225,7 @@ export async function runCrmAgentFor(
}
throw new Error(
"Rule mode has no fallback responses. Use a supported structured query or switch to langgraph mode with a configured LLM API key.",
"Rule mode supports only structured built-in queries. Use a supported query or switch to langgraph mode with a configured LLM API key.",
);
}

View File

@@ -430,13 +430,7 @@ export async function runLangGraphCrmAgentFor(input: {
llmBaseURL = gigachatBaseUrl;
llmModel = gigachatModel || "GigaChat-2-Max";
} catch (e: any) {
return {
text: `Не удалось получить токен GigaChat: ${String(e?.message || e)}`,
plan: ["Проверить GIGACHAT_AUTH_KEY", "Проверить GIGACHAT_SCOPE", "Проверить сетевой доступ до OAuth endpoint и перезапустить dev-сервер"],
tools: [],
thinking: ["Провайдер GigaChat настроен, но OAuth не прошел."],
toolRuns: [],
};
throw new Error(`Не удалось получить токен GigaChat: ${String(e?.message || e)}`);
}
}
@@ -918,8 +912,6 @@ export async function runLangGraphCrmAgentFor(input: {
tools: [crmTool],
});
const maxCycles = Math.max(1, Math.min(Number(process.env.CF_AGENT_MAX_CYCLES ?? "3"), 8));
const cycleTimeoutMs = Math.max(5000, Math.min(Number(process.env.CF_AGENT_CYCLE_TIMEOUT_MS ?? "1200000"), 1800000));
const tracingFlag = (process.env.LANGSMITH_TRACING ?? process.env.LANGCHAIN_TRACING_V2 ?? "").trim().toLowerCase();
const tracingEnabled = tracingFlag === "1" || tracingFlag === "true" || tracingFlag === "yes";
const langfuse = getLangfuseClient();
@@ -937,7 +929,6 @@ export async function runLangGraphCrmAgentFor(input: {
},
tags: ["clientsflow", "crm-agent", "langgraph"],
});
let consecutiveNoProgress = 0;
let finalText = "";
const cycleNotes: string[] = [];
@@ -1011,7 +1002,7 @@ export async function runLangGraphCrmAgentFor(input: {
};
};
for (let cycle = 1; cycle <= maxCycles; cycle += 1) {
for (let cycle = 1; ; cycle += 1) {
const userPrompt = cyclePrompt(input.userText, cycle, cycleNotes, pendingChanges.length);
const cycleSpan = lfTrace?.span({
name: "agent.cycle",
@@ -1028,7 +1019,11 @@ export async function runLangGraphCrmAgentFor(input: {
let res: any;
try {
const invokeConfig: Record<string, any> = { recursionLimit: 30 };
const invokeConfig: Record<string, any> = {};
const recursionLimit = Number(process.env.CF_AGENT_RECURSION_LIMIT ?? "1000000");
if (Number.isFinite(recursionLimit) && recursionLimit > 0) {
invokeConfig.recursionLimit = recursionLimit;
}
if (tracingEnabled) {
invokeConfig.runName = "clientsflow.crm_agent_cycle";
invokeConfig.tags = ["clientsflow", "crm-agent", "langgraph"];
@@ -1040,20 +1035,15 @@ export async function runLangGraphCrmAgentFor(input: {
cycle,
};
}
res = await Promise.race([
agent.invoke(
{
messages: [
{ role: "system", content: system },
{ role: "user", content: userPrompt },
],
},
invokeConfig,
),
new Promise((_resolve, reject) =>
setTimeout(() => reject(new Error(`Cycle timeout after ${cycleTimeoutMs}ms`)), cycleTimeoutMs),
),
]);
res = await agent.invoke(
{
messages: [
{ role: "system", content: system },
{ role: "user", content: userPrompt },
],
},
invokeConfig,
);
} catch (e: any) {
await emitTrace({ text: "Один из шагов завершился ошибкой." });
cycleSpan?.end({
@@ -1090,22 +1080,11 @@ export async function runLangGraphCrmAgentFor(input: {
: "Промежуточный шаг не дал прогресса, проверяю следующий вариант.",
});
if (!progressed) {
consecutiveNoProgress += 1;
} else {
consecutiveNoProgress = 0;
}
const done = (!progressed && cycle > 1) || cycle === maxCycles;
const done = !progressed && cycle > 1;
if (done) {
await emitTrace({ text: "Формирую итоговый ответ." });
break;
}
if (consecutiveNoProgress >= 2) {
await emitTrace({ text: "Останавливаюсь, чтобы не крутиться в пустом цикле." });
break;
}
}
lfTrace?.update({
@@ -1115,7 +1094,6 @@ export async function runLangGraphCrmAgentFor(input: {
toolRunsCount: toolRuns.length,
dbWritesCount: dbWrites.length,
pendingChangesCount: pendingChanges.length,
maxCycles,
},
});
void langfuse?.flushAsync().catch(() => {});

View File

@@ -9,7 +9,6 @@ type EnqueueBody = {
method?: "POST" | "PUT" | "PATCH";
headers?: Record<string, string>;
payload?: unknown;
timeoutMs?: number;
provider?: string;
channel?: string;
attempts?: number;
@@ -44,7 +43,6 @@ export default defineEventHandler(async (event) => {
method: body?.method ?? "POST",
headers: body?.headers ?? {},
payload: body?.payload ?? {},
timeoutMs: body?.timeoutMs,
provider: body?.provider ?? undefined,
channel: body?.channel ?? undefined,
},

View File

@@ -106,6 +106,20 @@ export default defineEventHandler(async (event) => {
writer.write({ type: "text-end", id: textId });
writer.write({ type: "finish", finishReason: "stop" });
} catch (error: any) {
const errorText = String(error?.message ?? error);
await persistChatMessage({
teamId: auth.teamId,
conversationId: auth.conversationId,
authorUserId: null,
role: "ASSISTANT",
text: errorText,
requestId,
eventType: "assistant",
phase: "error",
transient: false,
});
writer.write({
type: "data-agent-log",
data: {
@@ -118,7 +132,7 @@ export default defineEventHandler(async (event) => {
writer.write({
type: "text-delta",
id: textId,
delta: `Не удалось завершить задачу: ${String(error?.message ?? "unknown error")}`,
delta: errorText,
});
writer.write({ type: "text-end", id: textId });
writer.write({ type: "finish", finishReason: "stop" });

View File

@@ -232,16 +232,6 @@ async function archiveChatConversation(auth: AuthContext | null, event: H3Event,
return ctx.conversationId;
}
const fallback = await tx.chatConversation.findFirst({
where: { teamId: ctx.teamId, createdByUserId: ctx.userId },
orderBy: { updatedAt: "desc" },
select: { id: true },
});
if (fallback) {
return fallback.id;
}
const created = await tx.chatConversation.create({
data: { teamId: ctx.teamId, createdByUserId: ctx.userId, title: "Pilot" },
select: { id: true },

View File

@@ -10,7 +10,6 @@ export type OutboundDeliveryJob = {
method?: "POST" | "PUT" | "PATCH";
headers?: Record<string, string>;
payload: unknown;
timeoutMs?: number;
channel?: string;
provider?: string;
};
@@ -119,7 +118,6 @@ export function startOutboundDeliveryWorker() {
}
const endpoint = ensureHttpUrl(job.data.endpoint);
const timeoutMs = Math.max(1000, Math.min(job.data.timeoutMs ?? 20000, 120000));
const method = job.data.method ?? "POST";
const headers: Record<string, string> = {
"content-type": "application/json",
@@ -133,7 +131,6 @@ export function startOutboundDeliveryWorker() {
method,
headers,
body: JSON.stringify(requestPayload ?? {}),
signal: AbortSignal.timeout(timeoutMs),
});
const text = await response.text();

View File

@@ -69,24 +69,7 @@ export async function getAuthContext(event: H3Event): Promise<AuthContext> {
});
if (!conv) {
// Recover from stale conversation cookie after rebuild/reset:
// reuse latest available conversation (or recreate default one) and refresh cookie.
const fallback =
(await prisma.chatConversation.findFirst({
where: { teamId: team.id, createdByUserId: user.id },
orderBy: { updatedAt: "desc" },
})) ||
(await prisma.chatConversation.create({
data: { id: `pilot-${team.id}`, teamId: team.id, createdByUserId: user.id, title: "Pilot" },
}));
setSession(event, {
teamId: team.id,
userId: user.id,
conversationId: fallback.id,
});
return { teamId: team.id, userId: user.id, conversationId: fallback.id };
throw createError({ statusCode: 401, statusMessage: "Unauthorized" });
}
return { teamId: team.id, userId: user.id, conversationId: conv.id };