Add Telegram avatar proxy and realtime CRM websocket updates

This commit is contained in:
Ruslan Bakiev
2026-02-23 08:09:53 +07:00
parent f81a0fde55
commit 0f1028b0fa
5 changed files with 412 additions and 4 deletions

View File

@@ -0,0 +1,63 @@
import { getRouterParam, setHeader } from "h3";
import { getAuthContext } from "../../../../utils/auth";
import { requireTelegramBotToken, telegramApiBase, telegramBotApi } from "../../../../utils/telegram";
type TelegramProfilePhotoSize = {
file_id?: string;
};
type TelegramUserProfilePhotosResult = {
photos?: TelegramProfilePhotoSize[][];
};
type TelegramGetFileResult = {
file_path?: string;
};
function parseUserId(input: string) {
if (!/^\d+$/.test(input)) return null;
const value = Number(input);
if (!Number.isSafeInteger(value) || value <= 0) return null;
return value;
}
export default defineEventHandler(async (event) => {
await getAuthContext(event);
const rawUserId = String(getRouterParam(event, "userId") ?? "").trim();
const userId = parseUserId(rawUserId);
if (!userId) {
throw createError({ statusCode: 400, statusMessage: "invalid telegram user id" });
}
const profile = await telegramBotApi<TelegramUserProfilePhotosResult>("getUserProfilePhotos", {
user_id: userId,
limit: 1,
});
const sizes = profile.photos?.[0] ?? [];
const best = sizes[sizes.length - 1];
const fileId = String(best?.file_id ?? "").trim();
if (!fileId) {
throw createError({ statusCode: 404, statusMessage: "avatar not found" });
}
const fileMeta = await telegramBotApi<TelegramGetFileResult>("getFile", { file_id: fileId });
const filePath = String(fileMeta.file_path ?? "").trim();
if (!filePath) {
throw createError({ statusCode: 404, statusMessage: "avatar path not found" });
}
const token = requireTelegramBotToken();
const upstream = await fetch(`${telegramApiBase()}/file/bot${token}/${filePath}`);
if (!upstream.ok) {
throw createError({ statusCode: 502, statusMessage: "failed to load telegram avatar" });
}
const contentType = upstream.headers.get("content-type") || "image/jpeg";
const buffer = Buffer.from(await upstream.arrayBuffer());
setHeader(event, "content-type", contentType);
setHeader(event, "cache-control", "private, max-age=300");
return buffer;
});

View File

@@ -0,0 +1,165 @@
import { prisma } from "../../utils/prisma";
const COOKIE_USER = "cf_user";
const COOKIE_TEAM = "cf_team";
const COOKIE_CONV = "cf_conv";
const TEAM_POLL_INTERVAL_MS = 2000;
const peersByTeam = new Map<string, Set<any>>();
const peerTeamById = new Map<string, string>();
const lastSignatureByTeam = new Map<string, string>();
let pollTimer: ReturnType<typeof setInterval> | null = null;
function parseCookies(raw: string | null) {
const out = new Map<string, string>();
for (const part of String(raw ?? "").split(";")) {
const [key, ...rest] = part.trim().split("=");
if (!key) continue;
const value = rest.join("=");
try {
out.set(key, decodeURIComponent(value));
} catch {
out.set(key, value);
}
}
return out;
}
function attachPeerToTeam(peer: any, teamId: string) {
if (!peersByTeam.has(teamId)) peersByTeam.set(teamId, new Set());
peersByTeam.get(teamId)?.add(peer);
peerTeamById.set(String(peer.id), teamId);
}
function detachPeer(peer: any) {
const key = String(peer.id);
const teamId = peerTeamById.get(key);
if (!teamId) return;
peerTeamById.delete(key);
const peers = peersByTeam.get(teamId);
if (!peers) return;
peers.delete(peer);
if (peers.size === 0) {
peersByTeam.delete(teamId);
lastSignatureByTeam.delete(teamId);
}
}
function stopPollIfIdle() {
if (peersByTeam.size > 0 || !pollTimer) return;
clearInterval(pollTimer);
pollTimer = null;
}
async function validateSessionFromPeer(peer: any) {
const cookieHeader = peer?.request?.headers?.get?.("cookie") ?? null;
const cookies = parseCookies(cookieHeader);
const userId = String(cookies.get(COOKIE_USER) ?? "").trim();
const teamId = String(cookies.get(COOKIE_TEAM) ?? "").trim();
const conversationId = String(cookies.get(COOKIE_CONV) ?? "").trim();
if (!userId || !teamId || !conversationId) return null;
const [user, team, conv] = await Promise.all([
prisma.user.findUnique({ where: { id: userId }, select: { id: true } }),
prisma.team.findUnique({ where: { id: teamId }, select: { id: true } }),
prisma.chatConversation.findFirst({
where: { id: conversationId, teamId, createdByUserId: userId },
select: { id: true },
}),
]);
if (!user || !team || !conv) return null;
return { teamId };
}
async function computeTeamSignature(teamId: string) {
const [omniMessageMax, contactMax, contactMessageMax, telegramConnectionMax] = await Promise.all([
prisma.omniMessage.aggregate({
where: { teamId },
_max: { updatedAt: true },
}),
prisma.contact.aggregate({
where: { teamId },
_max: { updatedAt: true },
}),
prisma.contactMessage.aggregate({
where: { contact: { teamId } },
_max: { createdAt: true },
}),
prisma.telegramBusinessConnection.aggregate({
where: { teamId },
_max: { updatedAt: true },
}),
]);
return [
omniMessageMax._max.updatedAt?.toISOString() ?? "",
contactMax._max.updatedAt?.toISOString() ?? "",
contactMessageMax._max.createdAt?.toISOString() ?? "",
telegramConnectionMax._max.updatedAt?.toISOString() ?? "",
].join("|");
}
function sendJson(peer: any, payload: Record<string, unknown>) {
try {
peer.send(JSON.stringify(payload));
} catch {
// ignore socket write errors
}
}
async function pollAndBroadcast() {
for (const [teamId, peers] of peersByTeam.entries()) {
if (!peers.size) continue;
const signature = await computeTeamSignature(teamId);
const previous = lastSignatureByTeam.get(teamId);
if (signature === previous) continue;
lastSignatureByTeam.set(teamId, signature);
const payload = {
type: "dashboard.changed",
teamId,
at: new Date().toISOString(),
};
for (const peer of peers) {
sendJson(peer, payload);
}
}
}
function ensurePoll() {
if (pollTimer) return;
pollTimer = setInterval(() => {
void pollAndBroadcast();
}, TEAM_POLL_INTERVAL_MS);
}
export default defineWebSocketHandler({
async open(peer) {
const session = await validateSessionFromPeer(peer);
if (!session) {
peer.close(4401, "Unauthorized");
return;
}
attachPeerToTeam(peer, session.teamId);
ensurePoll();
sendJson(peer, { type: "realtime.connected", at: new Date().toISOString() });
void pollAndBroadcast();
},
close(peer) {
detachPeer(peer);
stopPollIfIdle();
},
error(peer) {
detachPeer(peer);
stopPollIfIdle();
},
});