refactor: flatten scheduler service to root schedulers dir
This commit is contained in:
118
schedulers/run.mjs
Normal file
118
schedulers/run.mjs
Normal file
@@ -0,0 +1,118 @@
|
||||
import crypto from "node:crypto";
|
||||
import { Pool } from "pg";
|
||||
|
||||
function readIntEnv(name, defaultValue) {
|
||||
const raw = String(process.env[name] ?? "").trim();
|
||||
if (!raw) return defaultValue;
|
||||
const parsed = Number.parseInt(raw, 10);
|
||||
return Number.isFinite(parsed) ? parsed : defaultValue;
|
||||
}
|
||||
|
||||
function makeTimelineEntryId() {
|
||||
return `ctle_${Date.now().toString(36)}_${crypto.randomBytes(8).toString("hex")}`;
|
||||
}
|
||||
|
||||
async function run() {
|
||||
const databaseUrl = String(process.env.DATABASE_URL ?? "").trim();
|
||||
if (!databaseUrl) {
|
||||
throw new Error("DATABASE_URL is required");
|
||||
}
|
||||
|
||||
const preDueMinutes = Math.max(1, readIntEnv("TIMELINE_EVENT_PREDUE_MINUTES", 30));
|
||||
const lookbackMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKBACK_MINUTES", 180));
|
||||
const lookaheadMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKAHEAD_MINUTES", 1440));
|
||||
const lockKey = readIntEnv("TIMELINE_SCHEDULER_LOCK_KEY", 603001);
|
||||
|
||||
const now = new Date();
|
||||
const rangeStart = new Date(now.getTime() - lookbackMinutes * 60_000);
|
||||
const rangeEnd = new Date(now.getTime() + lookaheadMinutes * 60_000);
|
||||
|
||||
const pool = new Pool({ connectionString: databaseUrl });
|
||||
const client = await pool.connect();
|
||||
|
||||
try {
|
||||
const lockRes = await client.query("SELECT pg_try_advisory_lock($1) AS locked", [lockKey]);
|
||||
const locked = Boolean(lockRes.rows?.[0]?.locked);
|
||||
if (!locked) {
|
||||
console.log(`[timeline-calendar-scheduler] skipped: lock ${lockKey} is busy`);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const eventsRes = await client.query(
|
||||
`
|
||||
SELECT
|
||||
"id",
|
||||
"teamId",
|
||||
"contactId",
|
||||
"startsAt"
|
||||
FROM "CalendarEvent"
|
||||
WHERE
|
||||
"isArchived" = FALSE
|
||||
AND "contactId" IS NOT NULL
|
||||
AND "startsAt" >= $1
|
||||
AND "startsAt" <= $2
|
||||
ORDER BY "startsAt" ASC
|
||||
`,
|
||||
[rangeStart.toISOString(), rangeEnd.toISOString()],
|
||||
);
|
||||
|
||||
let touched = 0;
|
||||
let skippedBeforeWindow = 0;
|
||||
|
||||
for (const event of eventsRes.rows) {
|
||||
const contactId = String(event.contactId ?? "").trim();
|
||||
const teamId = String(event.teamId ?? "").trim();
|
||||
const contentId = String(event.id ?? "").trim();
|
||||
const startsAt = new Date(event.startsAt);
|
||||
|
||||
if (!contactId || !teamId || !contentId || Number.isNaN(startsAt.getTime())) continue;
|
||||
|
||||
const preDueAt = new Date(startsAt.getTime() - preDueMinutes * 60_000);
|
||||
if (now < preDueAt) {
|
||||
skippedBeforeWindow += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
await client.query(
|
||||
`
|
||||
INSERT INTO "ClientTimelineEntry" (
|
||||
"id",
|
||||
"teamId",
|
||||
"contactId",
|
||||
"contentType",
|
||||
"contentId",
|
||||
"datetime",
|
||||
"createdAt",
|
||||
"updatedAt"
|
||||
)
|
||||
VALUES ($1, $2, $3, 'CALENDAR_EVENT', $4, $5, NOW(), NOW())
|
||||
ON CONFLICT ("teamId", "contentType", "contentId")
|
||||
DO UPDATE SET
|
||||
"contactId" = EXCLUDED."contactId",
|
||||
"datetime" = EXCLUDED."datetime",
|
||||
"updatedAt" = NOW()
|
||||
`,
|
||||
[makeTimelineEntryId(), teamId, contactId, contentId, preDueAt.toISOString()],
|
||||
);
|
||||
|
||||
touched += 1;
|
||||
}
|
||||
|
||||
console.log(
|
||||
`[timeline-calendar-scheduler] done: scanned=${eventsRes.rowCount ?? 0} updated=${touched} skipped_before_window=${skippedBeforeWindow} at=${now.toISOString()}`,
|
||||
);
|
||||
} finally {
|
||||
await client.query("SELECT pg_advisory_unlock($1)", [lockKey]).catch(() => undefined);
|
||||
}
|
||||
} finally {
|
||||
client.release();
|
||||
await pool.end();
|
||||
}
|
||||
}
|
||||
|
||||
run().catch((error) => {
|
||||
const message = error instanceof Error ? error.stack || error.message : String(error);
|
||||
console.error(`[timeline-calendar-scheduler] failed: ${message}`);
|
||||
process.exitCode = 1;
|
||||
});
|
||||
Reference in New Issue
Block a user