refactor: move timeline scheduler to isolated service
This commit is contained in:
@@ -2,7 +2,7 @@ version = 1
|
||||
|
||||
[services]
|
||||
frontend = { deploy_mode = "dokploy_webhook", env_storage = "dokploy_ui" }
|
||||
"frontend/schedulers/client-timeline-calendar" = { deploy_mode = "dokploy_webhook", env_storage = "dokploy_ui" }
|
||||
"schedulers/client-timeline-calendar" = { deploy_mode = "dokploy_webhook", env_storage = "dokploy_ui" }
|
||||
omni_outbound = { deploy_mode = "dokploy_webhook", env_storage = "dokploy_ui" }
|
||||
omni_inbound = { deploy_mode = "dokploy_webhook", env_storage = "dokploy_ui" }
|
||||
omni_chat = { deploy_mode = "dokploy_webhook", env_storage = "dokploy_ui" }
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
FROM node:22-bookworm-slim
|
||||
|
||||
WORKDIR /app/frontend
|
||||
|
||||
RUN apt-get update -y \
|
||||
&& apt-get install -y --no-install-recommends openssl ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY package*.json ./
|
||||
RUN npm ci --ignore-scripts --legacy-peer-deps
|
||||
|
||||
COPY prisma ./prisma
|
||||
RUN npx prisma generate --schema=prisma/schema.prisma
|
||||
|
||||
COPY schedulers ./schedulers
|
||||
|
||||
ENV NODE_ENV=production
|
||||
|
||||
CMD ["node", "schedulers/client-timeline-calendar/run.mjs"]
|
||||
@@ -1,38 +0,0 @@
|
||||
# Client Timeline Calendar Scheduler
|
||||
|
||||
Одноразовый scheduler-раннер для актуализации `ClientTimelineEntry.datetime` по календарным событиям.
|
||||
|
||||
## Что делает
|
||||
|
||||
- Ищет неархивные `CalendarEvent` с привязкой к контакту в окне времени вокруг `now`.
|
||||
- Если событие уже вошло в окно `start - 30 минут`, делает `upsert` в `ClientTimelineEntry` с `contentType=CALENDAR_EVENT` и `datetime=start-30m`.
|
||||
- Логика идемпотентна: повторный запуск только подтверждает корректное значение.
|
||||
- Берёт Postgres advisory lock, чтобы параллельные запуски не конфликтовали.
|
||||
|
||||
## Переменные окружения
|
||||
|
||||
- `DATABASE_URL` (обязательно)
|
||||
- `TIMELINE_EVENT_PREDUE_MINUTES` (по умолчанию `30`)
|
||||
- `TIMELINE_EVENT_LOOKBACK_MINUTES` (по умолчанию `180`)
|
||||
- `TIMELINE_EVENT_LOOKAHEAD_MINUTES` (по умолчанию `1440`)
|
||||
- `TIMELINE_SCHEDULER_LOCK_KEY` (по умолчанию `603001`)
|
||||
|
||||
## Локальный запуск
|
||||
|
||||
```bash
|
||||
cd frontend
|
||||
node schedulers/client-timeline-calendar/run.mjs
|
||||
```
|
||||
|
||||
## Docker запуск
|
||||
|
||||
```bash
|
||||
docker build -f schedulers/client-timeline-calendar/Dockerfile -t client-timeline-calendar-scheduler .
|
||||
docker run --rm \
|
||||
-e DATABASE_URL="$DATABASE_URL" \
|
||||
client-timeline-calendar-scheduler
|
||||
```
|
||||
|
||||
## Dokploy schedule
|
||||
|
||||
Создай Scheduled Job и поставь период `* * * * *` (раз в минуту), который запускает этот контейнер/команду.
|
||||
@@ -1,109 +0,0 @@
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
function readIntEnv(name, defaultValue) {
|
||||
const raw = String(process.env[name] ?? "").trim();
|
||||
if (!raw) return defaultValue;
|
||||
const parsed = Number.parseInt(raw, 10);
|
||||
return Number.isFinite(parsed) ? parsed : defaultValue;
|
||||
}
|
||||
|
||||
async function acquirePgLock(lockKey) {
|
||||
const rows = await prisma.$queryRawUnsafe("SELECT pg_try_advisory_lock($1) AS locked", lockKey);
|
||||
const first = Array.isArray(rows) ? rows[0] : null;
|
||||
return Boolean(first && (first.locked === true || first.locked === "t" || first.locked === 1));
|
||||
}
|
||||
|
||||
async function releasePgLock(lockKey) {
|
||||
await prisma.$queryRawUnsafe("SELECT pg_advisory_unlock($1)", lockKey).catch(() => undefined);
|
||||
}
|
||||
|
||||
async function run() {
|
||||
const preDueMinutes = Math.max(1, readIntEnv("TIMELINE_EVENT_PREDUE_MINUTES", 30));
|
||||
const lookbackMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKBACK_MINUTES", 180));
|
||||
const lookaheadMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKAHEAD_MINUTES", 1440));
|
||||
const lockKey = readIntEnv("TIMELINE_SCHEDULER_LOCK_KEY", 603001);
|
||||
|
||||
const now = new Date();
|
||||
const rangeStart = new Date(now.getTime() - lookbackMinutes * 60_000);
|
||||
const rangeEnd = new Date(now.getTime() + lookaheadMinutes * 60_000);
|
||||
|
||||
const locked = await acquirePgLock(lockKey);
|
||||
if (!locked) {
|
||||
console.log(`[timeline-calendar-scheduler] skipped: lock ${lockKey} is busy`);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const events = await prisma.calendarEvent.findMany({
|
||||
where: {
|
||||
isArchived: false,
|
||||
contactId: { not: null },
|
||||
startsAt: {
|
||||
gte: rangeStart,
|
||||
lte: rangeEnd,
|
||||
},
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
teamId: true,
|
||||
contactId: true,
|
||||
startsAt: true,
|
||||
},
|
||||
orderBy: { startsAt: "asc" },
|
||||
});
|
||||
|
||||
let touched = 0;
|
||||
let skippedBeforeWindow = 0;
|
||||
|
||||
for (const event of events) {
|
||||
if (!event.contactId) continue;
|
||||
|
||||
const preDueAt = new Date(event.startsAt.getTime() - preDueMinutes * 60_000);
|
||||
if (now < preDueAt) {
|
||||
skippedBeforeWindow += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
await prisma.clientTimelineEntry.upsert({
|
||||
where: {
|
||||
teamId_contentType_contentId: {
|
||||
teamId: event.teamId,
|
||||
contentType: "CALENDAR_EVENT",
|
||||
contentId: event.id,
|
||||
},
|
||||
},
|
||||
create: {
|
||||
teamId: event.teamId,
|
||||
contactId: event.contactId,
|
||||
contentType: "CALENDAR_EVENT",
|
||||
contentId: event.id,
|
||||
datetime: preDueAt,
|
||||
},
|
||||
update: {
|
||||
contactId: event.contactId,
|
||||
datetime: preDueAt,
|
||||
},
|
||||
});
|
||||
|
||||
touched += 1;
|
||||
}
|
||||
|
||||
console.log(
|
||||
`[timeline-calendar-scheduler] done: scanned=${events.length} updated=${touched} skipped_before_window=${skippedBeforeWindow} at=${now.toISOString()}`,
|
||||
);
|
||||
} finally {
|
||||
await releasePgLock(lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
run()
|
||||
.catch((error) => {
|
||||
const message = error instanceof Error ? error.stack || error.message : String(error);
|
||||
console.error(`[timeline-calendar-scheduler] failed: ${message}`);
|
||||
process.exitCode = 1;
|
||||
})
|
||||
.finally(async () => {
|
||||
await prisma.$disconnect();
|
||||
});
|
||||
@@ -77,7 +77,15 @@ async function validateSessionFromPeer(peer: any) {
|
||||
}
|
||||
|
||||
async function computeTeamSignature(teamId: string) {
|
||||
const [omniMessageMax, contactMax, contactMessageMax, telegramConnectionMax, contactInboxMax, inboxPrefMax] = await Promise.all([
|
||||
const [
|
||||
omniMessageMax,
|
||||
contactMax,
|
||||
contactMessageMax,
|
||||
telegramConnectionMax,
|
||||
contactInboxMax,
|
||||
inboxPrefMax,
|
||||
clientTimelineEntryMax,
|
||||
] = await Promise.all([
|
||||
prisma.omniMessage.aggregate({
|
||||
where: { teamId },
|
||||
_max: { updatedAt: true },
|
||||
@@ -102,6 +110,10 @@ async function computeTeamSignature(teamId: string) {
|
||||
where: { teamId },
|
||||
_max: { updatedAt: true },
|
||||
}),
|
||||
prisma.clientTimelineEntry.aggregate({
|
||||
where: { teamId },
|
||||
_max: { updatedAt: true },
|
||||
}),
|
||||
]);
|
||||
|
||||
return [
|
||||
@@ -111,6 +123,7 @@ async function computeTeamSignature(teamId: string) {
|
||||
telegramConnectionMax._max.updatedAt?.toISOString() ?? "",
|
||||
contactInboxMax._max.updatedAt?.toISOString() ?? "",
|
||||
inboxPrefMax._max.updatedAt?.toISOString() ?? "",
|
||||
clientTimelineEntryMax._max.updatedAt?.toISOString() ?? "",
|
||||
].join("|");
|
||||
}
|
||||
|
||||
|
||||
12
schedulers/client-timeline-calendar/Dockerfile
Normal file
12
schedulers/client-timeline-calendar/Dockerfile
Normal file
@@ -0,0 +1,12 @@
|
||||
FROM node:22-bookworm-slim
|
||||
|
||||
WORKDIR /app/scheduler
|
||||
|
||||
COPY package*.json ./
|
||||
RUN npm ci --omit=dev
|
||||
|
||||
COPY run.mjs ./run.mjs
|
||||
|
||||
ENV NODE_ENV=production
|
||||
|
||||
CMD ["npm", "run", "start"]
|
||||
38
schedulers/client-timeline-calendar/README.md
Normal file
38
schedulers/client-timeline-calendar/README.md
Normal file
@@ -0,0 +1,38 @@
|
||||
# Client Timeline Calendar Scheduler
|
||||
|
||||
Изолированный scheduler-сервис для актуализации `ClientTimelineEntry.datetime` за 30 минут до календарного события.
|
||||
|
||||
## Что делает
|
||||
|
||||
- Берет advisory lock в PostgreSQL, чтобы не было гонок между инстансами.
|
||||
- Находит активные `CalendarEvent` с `contactId` в рабочем окне времени.
|
||||
- Когда событие вошло в окно `start - 30m`, делает upsert в `ClientTimelineEntry`:
|
||||
- `contentType = CALENDAR_EVENT`
|
||||
- `datetime = startsAt - TIMELINE_EVENT_PREDUE_MINUTES`
|
||||
|
||||
## ENV
|
||||
|
||||
- `DATABASE_URL` (обязательно)
|
||||
- `TIMELINE_EVENT_PREDUE_MINUTES` (default `30`)
|
||||
- `TIMELINE_EVENT_LOOKBACK_MINUTES` (default `180`)
|
||||
- `TIMELINE_EVENT_LOOKAHEAD_MINUTES` (default `1440`)
|
||||
- `TIMELINE_SCHEDULER_LOCK_KEY` (default `603001`)
|
||||
|
||||
## Локально
|
||||
|
||||
```bash
|
||||
cd schedulers/client-timeline-calendar
|
||||
npm install
|
||||
npm run start
|
||||
```
|
||||
|
||||
## Docker
|
||||
|
||||
```bash
|
||||
docker build -t client-timeline-calendar-scheduler .
|
||||
docker run --rm -e DATABASE_URL="$DATABASE_URL" client-timeline-calendar-scheduler
|
||||
```
|
||||
|
||||
## Dokploy
|
||||
|
||||
Сделай отдельный app/job и поставь schedule `* * * * *`.
|
||||
160
schedulers/client-timeline-calendar/package-lock.json
generated
Normal file
160
schedulers/client-timeline-calendar/package-lock.json
generated
Normal file
@@ -0,0 +1,160 @@
|
||||
{
|
||||
"name": "client-timeline-calendar-scheduler",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "client-timeline-calendar-scheduler",
|
||||
"dependencies": {
|
||||
"pg": "^8.16.3"
|
||||
}
|
||||
},
|
||||
"node_modules/pg": {
|
||||
"version": "8.18.0",
|
||||
"resolved": "https://registry.npmjs.org/pg/-/pg-8.18.0.tgz",
|
||||
"integrity": "sha512-xqrUDL1b9MbkydY/s+VZ6v+xiMUmOUk7SS9d/1kpyQxoJ6U9AO1oIJyUWVZojbfe5Cc/oluutcgFG4L9RDP1iQ==",
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"pg-connection-string": "^2.11.0",
|
||||
"pg-pool": "^3.11.0",
|
||||
"pg-protocol": "^1.11.0",
|
||||
"pg-types": "2.2.0",
|
||||
"pgpass": "1.0.5"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">= 16.0.0"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"pg-cloudflare": "^1.3.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"pg-native": ">=3.0.1"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"pg-native": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/pg-cloudflare": {
|
||||
"version": "1.3.0",
|
||||
"resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.3.0.tgz",
|
||||
"integrity": "sha512-6lswVVSztmHiRtD6I8hw4qP/nDm1EJbKMRhf3HCYaqud7frGysPv7FYJ5noZQdhQtN2xJnimfMtvQq21pdbzyQ==",
|
||||
"license": "MIT",
|
||||
"optional": true
|
||||
},
|
||||
"node_modules/pg-connection-string": {
|
||||
"version": "2.11.0",
|
||||
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.11.0.tgz",
|
||||
"integrity": "sha512-kecgoJwhOpxYU21rZjULrmrBJ698U2RxXofKVzOn5UDj61BPj/qMb7diYUR1nLScCDbrztQFl1TaQZT0t1EtzQ==",
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/pg-int8": {
|
||||
"version": "1.0.1",
|
||||
"resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz",
|
||||
"integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==",
|
||||
"license": "ISC",
|
||||
"engines": {
|
||||
"node": ">=4.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/pg-pool": {
|
||||
"version": "3.11.0",
|
||||
"resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.11.0.tgz",
|
||||
"integrity": "sha512-MJYfvHwtGp870aeusDh+hg9apvOe2zmpZJpyt+BMtzUWlVqbhFmMK6bOBXLBUPd7iRtIF9fZplDc7KrPN3PN7w==",
|
||||
"license": "MIT",
|
||||
"peerDependencies": {
|
||||
"pg": ">=8.0"
|
||||
}
|
||||
},
|
||||
"node_modules/pg-protocol": {
|
||||
"version": "1.11.0",
|
||||
"resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.11.0.tgz",
|
||||
"integrity": "sha512-pfsxk2M9M3BuGgDOfuy37VNRRX3jmKgMjcvAcWqNDpZSf4cUmv8HSOl5ViRQFsfARFn0KuUQTgLxVMbNq5NW3g==",
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/pg-types": {
|
||||
"version": "2.2.0",
|
||||
"resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz",
|
||||
"integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"pg-int8": "1.0.1",
|
||||
"postgres-array": "~2.0.0",
|
||||
"postgres-bytea": "~1.0.0",
|
||||
"postgres-date": "~1.0.4",
|
||||
"postgres-interval": "^1.1.0"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=4"
|
||||
}
|
||||
},
|
||||
"node_modules/pgpass": {
|
||||
"version": "1.0.5",
|
||||
"resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz",
|
||||
"integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"split2": "^4.1.0"
|
||||
}
|
||||
},
|
||||
"node_modules/postgres-array": {
|
||||
"version": "2.0.0",
|
||||
"resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz",
|
||||
"integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=4"
|
||||
}
|
||||
},
|
||||
"node_modules/postgres-bytea": {
|
||||
"version": "1.0.1",
|
||||
"resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.1.tgz",
|
||||
"integrity": "sha512-5+5HqXnsZPE65IJZSMkZtURARZelel2oXUEO8rH83VS/hxH5vv1uHquPg5wZs8yMAfdv971IU+kcPUczi7NVBQ==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=0.10.0"
|
||||
}
|
||||
},
|
||||
"node_modules/postgres-date": {
|
||||
"version": "1.0.7",
|
||||
"resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz",
|
||||
"integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=0.10.0"
|
||||
}
|
||||
},
|
||||
"node_modules/postgres-interval": {
|
||||
"version": "1.2.0",
|
||||
"resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz",
|
||||
"integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"xtend": "^4.0.0"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=0.10.0"
|
||||
}
|
||||
},
|
||||
"node_modules/split2": {
|
||||
"version": "4.2.0",
|
||||
"resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz",
|
||||
"integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==",
|
||||
"license": "ISC",
|
||||
"engines": {
|
||||
"node": ">= 10.x"
|
||||
}
|
||||
},
|
||||
"node_modules/xtend": {
|
||||
"version": "4.0.2",
|
||||
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz",
|
||||
"integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=0.4"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
11
schedulers/client-timeline-calendar/package.json
Normal file
11
schedulers/client-timeline-calendar/package.json
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"name": "client-timeline-calendar-scheduler",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"start": "node run.mjs"
|
||||
},
|
||||
"dependencies": {
|
||||
"pg": "^8.16.3"
|
||||
}
|
||||
}
|
||||
118
schedulers/client-timeline-calendar/run.mjs
Normal file
118
schedulers/client-timeline-calendar/run.mjs
Normal file
@@ -0,0 +1,118 @@
|
||||
import crypto from "node:crypto";
|
||||
import { Pool } from "pg";
|
||||
|
||||
function readIntEnv(name, defaultValue) {
|
||||
const raw = String(process.env[name] ?? "").trim();
|
||||
if (!raw) return defaultValue;
|
||||
const parsed = Number.parseInt(raw, 10);
|
||||
return Number.isFinite(parsed) ? parsed : defaultValue;
|
||||
}
|
||||
|
||||
function makeTimelineEntryId() {
|
||||
return `ctle_${Date.now().toString(36)}_${crypto.randomBytes(8).toString("hex")}`;
|
||||
}
|
||||
|
||||
async function run() {
|
||||
const databaseUrl = String(process.env.DATABASE_URL ?? "").trim();
|
||||
if (!databaseUrl) {
|
||||
throw new Error("DATABASE_URL is required");
|
||||
}
|
||||
|
||||
const preDueMinutes = Math.max(1, readIntEnv("TIMELINE_EVENT_PREDUE_MINUTES", 30));
|
||||
const lookbackMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKBACK_MINUTES", 180));
|
||||
const lookaheadMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKAHEAD_MINUTES", 1440));
|
||||
const lockKey = readIntEnv("TIMELINE_SCHEDULER_LOCK_KEY", 603001);
|
||||
|
||||
const now = new Date();
|
||||
const rangeStart = new Date(now.getTime() - lookbackMinutes * 60_000);
|
||||
const rangeEnd = new Date(now.getTime() + lookaheadMinutes * 60_000);
|
||||
|
||||
const pool = new Pool({ connectionString: databaseUrl });
|
||||
const client = await pool.connect();
|
||||
|
||||
try {
|
||||
const lockRes = await client.query("SELECT pg_try_advisory_lock($1) AS locked", [lockKey]);
|
||||
const locked = Boolean(lockRes.rows?.[0]?.locked);
|
||||
if (!locked) {
|
||||
console.log(`[timeline-calendar-scheduler] skipped: lock ${lockKey} is busy`);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const eventsRes = await client.query(
|
||||
`
|
||||
SELECT
|
||||
"id",
|
||||
"teamId",
|
||||
"contactId",
|
||||
"startsAt"
|
||||
FROM "CalendarEvent"
|
||||
WHERE
|
||||
"isArchived" = FALSE
|
||||
AND "contactId" IS NOT NULL
|
||||
AND "startsAt" >= $1
|
||||
AND "startsAt" <= $2
|
||||
ORDER BY "startsAt" ASC
|
||||
`,
|
||||
[rangeStart.toISOString(), rangeEnd.toISOString()],
|
||||
);
|
||||
|
||||
let touched = 0;
|
||||
let skippedBeforeWindow = 0;
|
||||
|
||||
for (const event of eventsRes.rows) {
|
||||
const contactId = String(event.contactId ?? "").trim();
|
||||
const teamId = String(event.teamId ?? "").trim();
|
||||
const contentId = String(event.id ?? "").trim();
|
||||
const startsAt = new Date(event.startsAt);
|
||||
|
||||
if (!contactId || !teamId || !contentId || Number.isNaN(startsAt.getTime())) continue;
|
||||
|
||||
const preDueAt = new Date(startsAt.getTime() - preDueMinutes * 60_000);
|
||||
if (now < preDueAt) {
|
||||
skippedBeforeWindow += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
await client.query(
|
||||
`
|
||||
INSERT INTO "ClientTimelineEntry" (
|
||||
"id",
|
||||
"teamId",
|
||||
"contactId",
|
||||
"contentType",
|
||||
"contentId",
|
||||
"datetime",
|
||||
"createdAt",
|
||||
"updatedAt"
|
||||
)
|
||||
VALUES ($1, $2, $3, 'CALENDAR_EVENT', $4, $5, NOW(), NOW())
|
||||
ON CONFLICT ("teamId", "contentType", "contentId")
|
||||
DO UPDATE SET
|
||||
"contactId" = EXCLUDED."contactId",
|
||||
"datetime" = EXCLUDED."datetime",
|
||||
"updatedAt" = NOW()
|
||||
`,
|
||||
[makeTimelineEntryId(), teamId, contactId, contentId, preDueAt.toISOString()],
|
||||
);
|
||||
|
||||
touched += 1;
|
||||
}
|
||||
|
||||
console.log(
|
||||
`[timeline-calendar-scheduler] done: scanned=${eventsRes.rowCount ?? 0} updated=${touched} skipped_before_window=${skippedBeforeWindow} at=${now.toISOString()}`,
|
||||
);
|
||||
} finally {
|
||||
await client.query("SELECT pg_advisory_unlock($1)", [lockKey]).catch(() => undefined);
|
||||
}
|
||||
} finally {
|
||||
client.release();
|
||||
await pool.end();
|
||||
}
|
||||
}
|
||||
|
||||
run().catch((error) => {
|
||||
const message = error instanceof Error ? error.stack || error.message : String(error);
|
||||
console.error(`[timeline-calendar-scheduler] failed: ${message}`);
|
||||
process.exitCode = 1;
|
||||
});
|
||||
Reference in New Issue
Block a user