Use Postgres state DB for Telegram profile updates

This commit is contained in:
Ruslan Bakiev
2026-03-12 17:15:36 +07:00
parent 64e0d7565f
commit 42de04c1f6
6 changed files with 221 additions and 498 deletions

View File

@@ -1,4 +1,5 @@
import { startServer } from "./server";
import { closeProfileState } from "./profile-state";
const server = startServer();
@@ -15,8 +16,9 @@ async function shutdown(signal: string) {
resolve();
});
});
await closeProfileState();
} catch {
// ignore shutdown errors
// ignore shutdown errors during termination
}
process.exit(0);

View File

@@ -1,13 +1,10 @@
import { mkdirSync } from "node:fs";
import path from "node:path";
import Database from "better-sqlite3";
import { Pool } from "pg";
import type { OmniInboundEnvelopeV1 } from "./types";
const TELEGRAM_FILE_MARKER = "tg-file:";
const DEFAULT_STATE_DB_PATH = path.join(process.cwd(), ".data", "telegram_backend", "state.sqlite");
type AvatarStateRow = {
avatarFingerprint: string | null;
avatar_fingerprint: string | null;
};
function asString(value: unknown) {
@@ -16,9 +13,12 @@ function asString(value: unknown) {
return normalized || null;
}
function stateDbPath() {
const fromEnv = asString(process.env.TELEGRAM_PROFILE_STATE_DB_PATH);
return fromEnv ?? DEFAULT_STATE_DB_PATH;
function requiredEnv(name: string) {
const value = asString(process.env[name]);
if (!value) {
throw new Error(`${name} is required`);
}
return value;
}
function parseTelegramFileId(avatarUrl: string | null) {
@@ -28,44 +28,29 @@ function parseTelegramFileId(avatarUrl: string | null) {
return fileId || null;
}
function openStateDb() {
const dbPath = stateDbPath();
const dbDir = path.dirname(dbPath);
mkdirSync(dbDir, { recursive: true });
const pool = new Pool({
connectionString: requiredEnv("TELEGRAM_PROFILE_STATE_DATABASE_URL"),
});
const db = new Database(dbPath);
db.exec(`
CREATE TABLE IF NOT EXISTS telegram_contact_profile_state (
contact_external_id TEXT PRIMARY KEY,
avatar_fingerprint TEXT,
avatar_file_id TEXT,
updated_at TEXT NOT NULL
)
`);
return db;
let initPromise: Promise<void> | null = null;
async function ensureInitialized() {
if (!initPromise) {
initPromise = pool
.query(`
CREATE TABLE IF NOT EXISTS telegram_contact_profile_state (
contact_external_id TEXT PRIMARY KEY,
avatar_fingerprint TEXT,
avatar_file_id TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`)
.then(() => undefined);
}
await initPromise;
}
const db = openStateDb();
const selectStateStatement = db.prepare(`
SELECT avatar_fingerprint AS avatarFingerprint
FROM telegram_contact_profile_state
WHERE contact_external_id = ?
`);
const upsertStateStatement = db.prepare(`
INSERT INTO telegram_contact_profile_state (
contact_external_id,
avatar_fingerprint,
avatar_file_id,
updated_at
)
VALUES (?, ?, ?, ?)
ON CONFLICT(contact_external_id) DO UPDATE SET
avatar_fingerprint = excluded.avatar_fingerprint,
avatar_file_id = excluded.avatar_file_id,
updated_at = excluded.updated_at
`);
function detectAvatarChange(input: {
async function detectAvatarChange(input: {
contactExternalId: string | null;
avatarFileId: string | null;
avatarFingerprint: string | null;
@@ -78,25 +63,49 @@ function detectAvatarChange(input: {
return { changed: false };
}
const row = selectStateStatement.get(contactExternalId) as AvatarStateRow | undefined;
const previousFingerprint = asString(row?.avatarFingerprint);
await ensureInitialized();
const row = await pool.query<AvatarStateRow>(
`
SELECT avatar_fingerprint
FROM telegram_contact_profile_state
WHERE contact_external_id = $1
`,
[contactExternalId],
);
const previousFingerprint = asString(row.rows[0]?.avatar_fingerprint);
const changed = previousFingerprint !== avatarFingerprint;
if (changed) {
upsertStateStatement.run(contactExternalId, avatarFingerprint, avatarFileId, new Date().toISOString());
await pool.query(
`
INSERT INTO telegram_contact_profile_state (
contact_external_id,
avatar_fingerprint,
avatar_file_id,
updated_at
)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (contact_external_id) DO UPDATE SET
avatar_fingerprint = EXCLUDED.avatar_fingerprint,
avatar_file_id = EXCLUDED.avatar_file_id,
updated_at = NOW()
`,
[contactExternalId, avatarFingerprint, avatarFileId],
);
}
return { changed };
}
export function applyAvatarProfileState(envelope: OmniInboundEnvelopeV1): OmniInboundEnvelopeV1 {
export async function applyAvatarProfileState(envelope: OmniInboundEnvelopeV1): Promise<OmniInboundEnvelopeV1> {
const payload = envelope.payloadNormalized;
const contactExternalId = asString(payload.contactExternalId);
const contactAvatarUrl = asString(payload.contactAvatarUrl);
const contactAvatarFingerprint = asString(payload.contactAvatarFingerprint);
const avatarFileId = parseTelegramFileId(contactAvatarUrl);
const avatarState = detectAvatarChange({
const avatarState = await detectAvatarChange({
contactExternalId,
avatarFileId,
avatarFingerprint: contactAvatarFingerprint,
@@ -111,3 +120,7 @@ export function applyAvatarProfileState(envelope: OmniInboundEnvelopeV1): OmniIn
},
};
}
export async function closeProfileState() {
await pool.end();
}

View File

@@ -206,7 +206,7 @@ export function startServer() {
try {
const body = await readJsonBody(req);
const envelope = applyAvatarProfileState(parseTelegramBusinessUpdate(body));
const envelope = await applyAvatarProfileState(parseTelegramBusinessUpdate(body));
const run = await enqueueTelegramInboundTask(envelope);
writeJson(res, 200, {