Files
clientsflow/omni_inbound/src/server.ts

145 lines
4.2 KiB
TypeScript

import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
import { RECEIVER_FLOW_QUEUE_NAME, enqueueInboundEvent, isDuplicateJobError } from "./queue";
import { parseTelegramBusinessUpdate } from "./telegram";
const MAX_BODY_SIZE_BYTES = Number(process.env.MAX_BODY_SIZE_BYTES || 1024 * 1024);
function writeJson(res: ServerResponse, statusCode: number, body: unknown) {
const payload = JSON.stringify(body);
res.statusCode = statusCode;
res.setHeader("content-type", "application/json; charset=utf-8");
res.end(payload);
}
async function readJsonBody(req: IncomingMessage): Promise<unknown> {
const chunks: Buffer[] = [];
let total = 0;
for await (const chunk of req) {
const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
total += buf.length;
if (total > MAX_BODY_SIZE_BYTES) {
throw new Error(`payload_too_large:${MAX_BODY_SIZE_BYTES}`);
}
chunks.push(buf);
}
const raw = Buffer.concat(chunks).toString("utf8");
if (!raw) return {};
return JSON.parse(raw);
}
function validateTelegramSecret(req: IncomingMessage): boolean {
const expected = (process.env.TELEGRAM_WEBHOOK_SECRET || "").trim();
if (!expected) return true;
const incoming = String(req.headers["x-telegram-bot-api-secret-token"] || "").trim();
return incoming !== "" && incoming === expected;
}
async function forwardTelegramConnectWebhook(rawBody: unknown) {
const url = (process.env.TELEGRAM_CONNECT_WEBHOOK_FORWARD_URL || "").trim();
if (!url) return;
const headers: Record<string, string> = {
"content-type": "application/json",
};
const secret = (process.env.TELEGRAM_WEBHOOK_SECRET || "").trim();
if (secret) {
headers["x-telegram-bot-api-secret-token"] = secret;
}
try {
const res = await fetch(url, {
method: "POST",
headers,
body: JSON.stringify(rawBody ?? {}),
});
if (!res.ok) {
const text = await res.text().catch(() => "");
console.warn(`[omni_inbound] telegram connect forward failed: ${res.status} ${text.slice(0, 300)}`);
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.warn(`[omni_inbound] telegram connect forward error: ${message}`);
}
}
export function startServer() {
const port = Number(process.env.PORT || 8080);
const server = createServer(async (req, res) => {
if (!req.url || !req.method) {
writeJson(res, 404, { ok: false, error: "not_found" });
return;
}
if (req.url === "/health" && req.method === "GET") {
writeJson(res, 200, {
ok: true,
service: "omni_inbound",
queue: RECEIVER_FLOW_QUEUE_NAME,
now: new Date().toISOString(),
});
return;
}
if (req.url === "/webhooks/telegram/business" && req.method === "POST") {
if (!validateTelegramSecret(req)) {
writeJson(res, 401, { ok: false, error: "invalid_webhook_secret" });
return;
}
let body: unknown = {};
let envelope: ReturnType<typeof parseTelegramBusinessUpdate> | null = null;
try {
body = await readJsonBody(req);
envelope = parseTelegramBusinessUpdate(body);
await enqueueInboundEvent(envelope);
void forwardTelegramConnectWebhook(body);
writeJson(res, 200, {
ok: true,
queued: true,
duplicate: false,
providerEventId: envelope.providerEventId,
idempotencyKey: envelope.idempotencyKey,
});
} catch (error) {
if (isDuplicateJobError(error)) {
void forwardTelegramConnectWebhook(body);
writeJson(res, 200, {
ok: true,
queued: false,
duplicate: true,
});
return;
}
const message = error instanceof Error ? error.message : String(error);
const statusCode = message.startsWith("payload_too_large:") ? 413 : 503;
writeJson(res, statusCode, {
ok: false,
error: "receiver_enqueue_failed",
message,
});
}
return;
}
writeJson(res, 404, { ok: false, error: "not_found" });
});
server.listen(port, "0.0.0.0", () => {
console.log(`[omni_inbound] listening on :${port}`);
});
return server;
}