Add multi-cycle agent loop with progress checks and timeouts

This commit is contained in:
Ruslan Bakiev
2026-02-19 06:42:24 +07:00
parent 39dcfc97c4
commit a8b8d77c3e

View File

@@ -859,13 +859,23 @@ export async function runLangGraphCrmAgentFor(input: {
tools: [crmTool],
responseFormat: z.object({
answer: z.string().describe("Final assistant answer for the user."),
done: z.boolean().describe("Whether objective is complete in this cycle."),
progressSummary: z.string().optional().describe("One-line progress note for system trace."),
nextStep: z.string().optional().describe("Short next step if not done."),
}),
});
const maxCycles = Math.max(1, Math.min(Number(process.env.CF_AGENT_MAX_CYCLES ?? "3"), 8));
const cycleTimeoutMs = Math.max(5000, Math.min(Number(process.env.CF_AGENT_CYCLE_TIMEOUT_MS ?? "45000"), 180000));
let consecutiveNoProgress = 0;
let finalText = "";
const cycleNotes: string[] = [];
const system = [
"You are Pilot, a CRM assistant.",
"Rules:",
"- Be concrete and concise.",
"- Work in short iterative cycles. Do not stop after the first thought if the task needs more than one action.",
"- You are given a structured CRM JSON snapshot as baseline context.",
"- If you need fresher or narrower data, call crm.get_snapshot/query_* tools.",
"- For changes, stage first with mode=stage. Commit only when user asks to execute.",
@@ -877,16 +887,6 @@ export async function runLangGraphCrmAgentFor(input: {
snapshotJson,
].join("\n");
const res: any = await agent.invoke(
{
messages: [
{ role: "system", content: system },
{ role: "user", content: input.userText },
],
},
{ recursionLimit: 30 },
);
const extractText = (value: unknown, depth = 0): string => {
if (depth > 5 || value == null) return "";
if (typeof value === "string") return value.trim();
@@ -923,31 +923,116 @@ export async function runLangGraphCrmAgentFor(input: {
return String(msg?.type ?? msg?.role ?? msg?.constructor?.name ?? "");
};
const structured = res?.structuredResponse as { answer?: string } | undefined;
const fallbackText = (() => {
const messages = Array.isArray(res?.messages) ? res.messages : [];
for (let i = messages.length - 1; i >= 0; i -= 1) {
const msg = messages[i];
const type = messageType(msg).toLowerCase();
if (!type.includes("ai") && !type.includes("assistant")) continue;
const text = extractText(msg?.content) || extractText(msg);
if (text) return text;
const extractResult = (res: any) => {
const structured = res?.structuredResponse as
| { answer?: string; done?: boolean; progressSummary?: string; nextStep?: string }
| undefined;
const fallbackText = (() => {
const messages = Array.isArray(res?.messages) ? res.messages : [];
for (let i = messages.length - 1; i >= 0; i -= 1) {
const msg = messages[i];
const type = messageType(msg).toLowerCase();
if (!type.includes("ai") && !type.includes("assistant")) continue;
const text = extractText(msg?.content) || extractText(msg);
if (text) return text;
}
return extractText(res?.output) || extractText(res?.response) || extractText(res?.finalResponse) || "";
})();
return {
text: (structured?.answer?.trim() || fallbackText).trim(),
done: typeof structured?.done === "boolean" ? structured.done : undefined,
progressSummary: (structured?.progressSummary ?? "").trim(),
nextStep: (structured?.nextStep ?? "").trim(),
};
};
for (let cycle = 1; cycle <= maxCycles; cycle += 1) {
await emitTrace({ text: `Cycle ${cycle}/${maxCycles}: start` });
const beforeRuns = toolRuns.length;
const beforeWrites = dbWrites.length;
const beforePending = pendingChanges.length;
const userPrompt =
cycle === 1
? input.userText
: [
"Continue solving the same user request.",
`User request: ${input.userText}`,
cycleNotes.length ? `Progress notes:\n- ${cycleNotes.join("\n- ")}` : "No progress notes yet.",
`Pending staged changes: ${pendingChanges.length}.`,
"Do the next useful step. If done, produce final concise answer.",
].join("\n");
let res: any;
try {
res = await Promise.race([
agent.invoke(
{
messages: [
{ role: "system", content: system },
{ role: "user", content: userPrompt },
],
},
{ recursionLimit: 30 },
),
new Promise((_resolve, reject) =>
setTimeout(() => reject(new Error(`Cycle timeout after ${cycleTimeoutMs}ms`)), cycleTimeoutMs),
),
]);
} catch (e: any) {
await emitTrace({ text: `Cycle ${cycle}/${maxCycles}: failed (${String(e?.message || e)})` });
if (!finalText) {
finalText = "Не удалось завершить задачу за отведенное время. Уточни запрос или сократи объем.";
}
break;
}
return (
extractText(res?.output) ||
extractText(res?.response) ||
extractText(res?.finalResponse) ||
""
);
})();
const text = structured?.answer?.trim() || fallbackText;
if (!text) {
const parsed = extractResult(res);
if (parsed.text) {
finalText = parsed.text;
}
const progressed =
toolRuns.length > beforeRuns || dbWrites.length > beforeWrites || pendingChanges.length !== beforePending;
if (parsed.progressSummary) {
cycleNotes.push(parsed.progressSummary);
} else if (progressed) {
cycleNotes.push(`Cycle ${cycle}: updated tools/data state.`);
}
await emitTrace({
text: `Cycle ${cycle}/${maxCycles}: ${progressed ? "progress" : "no progress"} · pending=${pendingChanges.length}`,
});
if (!progressed) {
consecutiveNoProgress += 1;
} else {
consecutiveNoProgress = 0;
}
const done =
typeof parsed.done === "boolean"
? parsed.done
: (!progressed && cycle > 1) || cycle === maxCycles;
if (done) {
await emitTrace({ text: `Cycle ${cycle}/${maxCycles}: done` });
break;
}
if (consecutiveNoProgress >= 2) {
await emitTrace({ text: `Cycle ${cycle}/${maxCycles}: stopped (no progress)` });
break;
}
}
if (!finalText) {
throw new Error("Model returned empty response");
}
const plan: string[] = [];
return {
text,
text: finalText,
plan,
thinking: [],
tools: toolsUsed,