Files
clientsflow/schedulers/run.mjs

119 lines
3.8 KiB
JavaScript

import crypto from "node:crypto";
import { Pool } from "pg";
function readIntEnv(name, defaultValue) {
const raw = String(process.env[name] ?? "").trim();
if (!raw) return defaultValue;
const parsed = Number.parseInt(raw, 10);
return Number.isFinite(parsed) ? parsed : defaultValue;
}
function makeTimelineEntryId() {
return `ctle_${Date.now().toString(36)}_${crypto.randomBytes(8).toString("hex")}`;
}
async function run() {
const databaseUrl = String(process.env.DATABASE_URL ?? "").trim();
if (!databaseUrl) {
throw new Error("DATABASE_URL is required");
}
const preDueMinutes = Math.max(1, readIntEnv("TIMELINE_EVENT_PREDUE_MINUTES", 30));
const lookbackMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKBACK_MINUTES", 180));
const lookaheadMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKAHEAD_MINUTES", 1440));
const lockKey = readIntEnv("TIMELINE_SCHEDULER_LOCK_KEY", 603001);
const now = new Date();
const rangeStart = new Date(now.getTime() - lookbackMinutes * 60_000);
const rangeEnd = new Date(now.getTime() + lookaheadMinutes * 60_000);
const pool = new Pool({ connectionString: databaseUrl });
const client = await pool.connect();
try {
const lockRes = await client.query("SELECT pg_try_advisory_lock($1) AS locked", [lockKey]);
const locked = Boolean(lockRes.rows?.[0]?.locked);
if (!locked) {
console.log(`[timeline-calendar-scheduler] skipped: lock ${lockKey} is busy`);
return;
}
try {
const eventsRes = await client.query(
`
SELECT
"id",
"teamId",
"contactId",
"startsAt"
FROM "CalendarEvent"
WHERE
"isArchived" = FALSE
AND "contactId" IS NOT NULL
AND "startsAt" >= $1
AND "startsAt" <= $2
ORDER BY "startsAt" ASC
`,
[rangeStart.toISOString(), rangeEnd.toISOString()],
);
let touched = 0;
let skippedBeforeWindow = 0;
for (const event of eventsRes.rows) {
const contactId = String(event.contactId ?? "").trim();
const teamId = String(event.teamId ?? "").trim();
const contentId = String(event.id ?? "").trim();
const startsAt = new Date(event.startsAt);
if (!contactId || !teamId || !contentId || Number.isNaN(startsAt.getTime())) continue;
const preDueAt = new Date(startsAt.getTime() - preDueMinutes * 60_000);
if (now < preDueAt) {
skippedBeforeWindow += 1;
continue;
}
await client.query(
`
INSERT INTO "ClientTimelineEntry" (
"id",
"teamId",
"contactId",
"contentType",
"contentId",
"datetime",
"createdAt",
"updatedAt"
)
VALUES ($1, $2, $3, 'CALENDAR_EVENT', $4, $5, NOW(), NOW())
ON CONFLICT ("teamId", "contentType", "contentId")
DO UPDATE SET
"contactId" = EXCLUDED."contactId",
"datetime" = EXCLUDED."datetime",
"updatedAt" = NOW()
`,
[makeTimelineEntryId(), teamId, contactId, contentId, preDueAt.toISOString()],
);
touched += 1;
}
console.log(
`[timeline-calendar-scheduler] done: scanned=${eventsRes.rowCount ?? 0} updated=${touched} skipped_before_window=${skippedBeforeWindow} at=${now.toISOString()}`,
);
} finally {
await client.query("SELECT pg_advisory_unlock($1)", [lockKey]).catch(() => undefined);
}
} finally {
client.release();
await pool.end();
}
}
run().catch((error) => {
const message = error instanceof Error ? error.stack || error.message : String(error);
console.error(`[timeline-calendar-scheduler] failed: ${message}`);
process.exitCode = 1;
});