119 lines
3.8 KiB
JavaScript
119 lines
3.8 KiB
JavaScript
import crypto from "node:crypto";
|
|
import { Pool } from "pg";
|
|
|
|
function readIntEnv(name, defaultValue) {
|
|
const raw = String(process.env[name] ?? "").trim();
|
|
if (!raw) return defaultValue;
|
|
const parsed = Number.parseInt(raw, 10);
|
|
return Number.isFinite(parsed) ? parsed : defaultValue;
|
|
}
|
|
|
|
function makeTimelineEntryId() {
|
|
return `ctle_${Date.now().toString(36)}_${crypto.randomBytes(8).toString("hex")}`;
|
|
}
|
|
|
|
async function run() {
|
|
const databaseUrl = String(process.env.DATABASE_URL ?? "").trim();
|
|
if (!databaseUrl) {
|
|
throw new Error("DATABASE_URL is required");
|
|
}
|
|
|
|
const preDueMinutes = Math.max(1, readIntEnv("TIMELINE_EVENT_PREDUE_MINUTES", 30));
|
|
const lookbackMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKBACK_MINUTES", 180));
|
|
const lookaheadMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKAHEAD_MINUTES", 1440));
|
|
const lockKey = readIntEnv("TIMELINE_SCHEDULER_LOCK_KEY", 603001);
|
|
|
|
const now = new Date();
|
|
const rangeStart = new Date(now.getTime() - lookbackMinutes * 60_000);
|
|
const rangeEnd = new Date(now.getTime() + lookaheadMinutes * 60_000);
|
|
|
|
const pool = new Pool({ connectionString: databaseUrl });
|
|
const client = await pool.connect();
|
|
|
|
try {
|
|
const lockRes = await client.query("SELECT pg_try_advisory_lock($1) AS locked", [lockKey]);
|
|
const locked = Boolean(lockRes.rows?.[0]?.locked);
|
|
if (!locked) {
|
|
console.log(`[timeline-calendar-scheduler] skipped: lock ${lockKey} is busy`);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const eventsRes = await client.query(
|
|
`
|
|
SELECT
|
|
"id",
|
|
"teamId",
|
|
"contactId",
|
|
"startsAt"
|
|
FROM "CalendarEvent"
|
|
WHERE
|
|
"isArchived" = FALSE
|
|
AND "contactId" IS NOT NULL
|
|
AND "startsAt" >= $1
|
|
AND "startsAt" <= $2
|
|
ORDER BY "startsAt" ASC
|
|
`,
|
|
[rangeStart.toISOString(), rangeEnd.toISOString()],
|
|
);
|
|
|
|
let touched = 0;
|
|
let skippedBeforeWindow = 0;
|
|
|
|
for (const event of eventsRes.rows) {
|
|
const contactId = String(event.contactId ?? "").trim();
|
|
const teamId = String(event.teamId ?? "").trim();
|
|
const contentId = String(event.id ?? "").trim();
|
|
const startsAt = new Date(event.startsAt);
|
|
|
|
if (!contactId || !teamId || !contentId || Number.isNaN(startsAt.getTime())) continue;
|
|
|
|
const preDueAt = new Date(startsAt.getTime() - preDueMinutes * 60_000);
|
|
if (now < preDueAt) {
|
|
skippedBeforeWindow += 1;
|
|
continue;
|
|
}
|
|
|
|
await client.query(
|
|
`
|
|
INSERT INTO "ClientTimelineEntry" (
|
|
"id",
|
|
"teamId",
|
|
"contactId",
|
|
"contentType",
|
|
"contentId",
|
|
"datetime",
|
|
"createdAt",
|
|
"updatedAt"
|
|
)
|
|
VALUES ($1, $2, $3, 'CALENDAR_EVENT', $4, $5, NOW(), NOW())
|
|
ON CONFLICT ("teamId", "contentType", "contentId")
|
|
DO UPDATE SET
|
|
"contactId" = EXCLUDED."contactId",
|
|
"datetime" = EXCLUDED."datetime",
|
|
"updatedAt" = NOW()
|
|
`,
|
|
[makeTimelineEntryId(), teamId, contactId, contentId, preDueAt.toISOString()],
|
|
);
|
|
|
|
touched += 1;
|
|
}
|
|
|
|
console.log(
|
|
`[timeline-calendar-scheduler] done: scanned=${eventsRes.rowCount ?? 0} updated=${touched} skipped_before_window=${skippedBeforeWindow} at=${now.toISOString()}`,
|
|
);
|
|
} finally {
|
|
await client.query("SELECT pg_advisory_unlock($1)", [lockKey]).catch(() => undefined);
|
|
}
|
|
} finally {
|
|
client.release();
|
|
await pool.end();
|
|
}
|
|
}
|
|
|
|
run().catch((error) => {
|
|
const message = error instanceof Error ? error.stack || error.message : String(error);
|
|
console.error(`[timeline-calendar-scheduler] failed: ${message}`);
|
|
process.exitCode = 1;
|
|
});
|