Files
clientsflow/backend/src/service.ts
2026-03-08 19:15:30 +07:00

638 lines
16 KiB
TypeScript

type MessageDirection = "IN" | "OUT";
type OmniMessageStatus = "PENDING" | "SENT" | "FAILED" | "DELIVERED" | "READ";
import { prisma } from "./utils/prisma";
export type TelegramInboundEnvelope = {
version: number;
idempotencyKey: string;
provider: string;
channel: string;
direction: "IN" | "OUT";
providerEventId: string;
providerMessageId: string | null;
eventType: string;
occurredAt: string;
receivedAt: string;
payloadRaw: unknown;
payloadNormalized: {
threadExternalId: string | null;
contactExternalId: string | null;
text: string | null;
businessConnectionId: string | null;
[key: string]: unknown;
};
};
export type TelegramOutboundReport = {
omniMessageId: string;
status: string;
providerMessageId?: string | null;
error?: string | null;
responseJson?: string | null;
};
export type TelegramOutboundRequest = {
omniMessageId: string;
chatId: string;
text: string;
businessConnectionId?: string | null;
};
export type CalendarPredueSyncResult = {
ok: boolean;
message: string;
now: string;
scanned: number;
updated: number;
skippedBeforeWindow: number;
skippedLocked: boolean;
preDueMinutes: number;
lookbackMinutes: number;
lookaheadMinutes: number;
lockKey: number;
};
function asString(value: unknown) {
if (typeof value !== "string") return null;
const v = value.trim();
return v || null;
}
function parseDate(value: string) {
const d = new Date(value);
if (Number.isNaN(d.getTime())) return new Date();
return d;
}
function normalizeDirection(value: string): MessageDirection {
return value === "OUT" ? "OUT" : "IN";
}
function readIntEnv(name: string, defaultValue: number) {
const raw = asString(process.env[name]);
if (!raw) return defaultValue;
const parsed = Number.parseInt(raw, 10);
return Number.isFinite(parsed) ? parsed : defaultValue;
}
async function resolveTeamId(envelope: TelegramInboundEnvelope) {
const n = envelope.payloadNormalized;
const bcId = asString(n.businessConnectionId);
if (bcId) {
const linked = await prisma.telegramBusinessConnection.findFirst({
where: { businessConnectionId: bcId },
orderBy: { updatedAt: "desc" },
select: { teamId: true },
});
if (linked?.teamId) return linked.teamId;
}
const externalContactId = asString(n.contactExternalId) ?? asString(n.threadExternalId);
if (externalContactId) {
const linked = await prisma.telegramBusinessConnection.findFirst({
where: { businessConnectionId: `link:${externalContactId}` },
orderBy: { updatedAt: "desc" },
select: { teamId: true },
});
if (linked?.teamId) return linked.teamId;
}
const fallback = asString(process.env.DEFAULT_TEAM_ID);
if (fallback) return fallback;
const firstTeam = await prisma.team.findFirst({
orderBy: { createdAt: "asc" },
select: { id: true },
});
return firstTeam?.id ?? null;
}
async function resolveContact(input: {
teamId: string;
externalContactId: string;
displayName: string;
avatarUrl: string | null;
}) {
const existing = await prisma.omniContactIdentity.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
select: { contactId: true },
});
if (existing?.contactId) {
return existing.contactId;
}
const contact = await prisma.contact.create({
data: {
teamId: input.teamId,
name: input.displayName,
avatarUrl: input.avatarUrl,
},
select: { id: true },
});
try {
await prisma.omniContactIdentity.create({
data: {
teamId: input.teamId,
contactId: contact.id,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
});
return contact.id;
} catch {
const concurrent = await prisma.omniContactIdentity.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalId: input.externalContactId,
},
select: { contactId: true },
});
if (concurrent?.contactId) {
await prisma.contact.delete({ where: { id: contact.id } }).catch(() => undefined);
return concurrent.contactId;
}
throw new Error("failed to create telegram contact identity");
}
}
async function upsertThread(input: {
teamId: string;
contactId: string;
externalChatId: string;
businessConnectionId: string | null;
title: string | null;
}) {
const existing = await prisma.omniThread.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
},
select: { id: true },
});
if (existing) {
await prisma.omniThread.update({
where: { id: existing.id },
data: {
contactId: input.contactId,
...(input.title ? { title: input.title } : {}),
},
select: { id: true },
});
return existing.id;
}
try {
const created = await prisma.omniThread.create({
data: {
teamId: input.teamId,
contactId: input.contactId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
title: input.title,
},
select: { id: true },
});
return created.id;
} catch {
const concurrent = await prisma.omniThread.findFirst({
where: {
teamId: input.teamId,
channel: "TELEGRAM",
externalChatId: input.externalChatId,
businessConnectionId: input.businessConnectionId,
},
select: { id: true },
});
if (concurrent?.id) return concurrent.id;
throw new Error("failed to upsert telegram thread");
}
}
async function upsertContactInbox(input: {
teamId: string;
contactId: string;
sourceExternalId: string;
title: string | null;
}) {
const inbox = await prisma.contactInbox.upsert({
where: {
teamId_channel_sourceExternalId: {
teamId: input.teamId,
channel: "TELEGRAM",
sourceExternalId: input.sourceExternalId,
},
},
create: {
teamId: input.teamId,
contactId: input.contactId,
channel: "TELEGRAM",
sourceExternalId: input.sourceExternalId,
title: input.title,
},
update: {
contactId: input.contactId,
...(input.title ? { title: input.title } : {}),
},
select: { id: true },
});
return inbox.id;
}
async function markRead(teamId: string, externalChatId: string) {
const thread = await prisma.omniThread.findFirst({
where: {
teamId,
channel: "TELEGRAM",
externalChatId,
},
select: { contactId: true },
});
if (!thread) return;
const members = await prisma.teamMember.findMany({
where: { teamId },
select: { userId: true },
});
const readAt = new Date();
await Promise.all(
members.map((member: { userId: string }) =>
prisma.contactThreadRead.upsert({
where: {
userId_contactId: {
userId: member.userId,
contactId: thread.contactId,
},
},
create: {
teamId,
userId: member.userId,
contactId: thread.contactId,
readAt,
},
update: { readAt },
}),
),
);
}
export async function ingestTelegramInbound(envelope: TelegramInboundEnvelope) {
if (envelope.channel !== "TELEGRAM") {
return { ok: true, message: "skip_non_telegram" };
}
const teamId = await resolveTeamId(envelope);
if (!teamId) {
throw new Error("team_not_resolved");
}
const n = envelope.payloadNormalized;
const externalChatId = asString(n.threadExternalId) ?? asString(n.contactExternalId);
if (!externalChatId) {
throw new Error("thread_external_id_required");
}
if (envelope.eventType === "read_business_message") {
await markRead(teamId, externalChatId);
return { ok: true, message: "read_marked" };
}
const externalContactId = asString(n.contactExternalId) ?? externalChatId;
const businessConnectionId = asString(n.businessConnectionId);
const text = asString(n.text) ?? "[no text]";
const occurredAt = parseDate(envelope.occurredAt);
const direction = normalizeDirection(envelope.direction);
const contactFirstName = asString(n.contactFirstName);
const contactLastName = asString(n.contactLastName);
const contactUsername = asString(n.contactUsername);
const fallbackName = `Telegram ${externalContactId}`;
const displayName =
[contactFirstName, contactLastName].filter(Boolean).join(" ") ||
(contactUsername ? `@${contactUsername.replace(/^@/, "")}` : null) ||
fallbackName;
const contactId = await resolveContact({
teamId,
externalContactId,
displayName,
avatarUrl: asString(n.contactAvatarUrl),
});
const threadId = await upsertThread({
teamId,
contactId,
externalChatId,
businessConnectionId,
title: asString(n.chatTitle),
});
const contactInboxId = await upsertContactInbox({
teamId,
contactId,
sourceExternalId: externalChatId,
title: asString(n.chatTitle),
});
const rawEnvelope: Record<string, unknown> = {
version: envelope.version,
source: "backend.graphql.ingestTelegramInbound",
provider: envelope.provider,
channel: envelope.channel,
direction,
providerEventId: envelope.providerEventId,
receivedAt: envelope.receivedAt,
occurredAt: occurredAt.toISOString(),
payloadNormalized: n,
payloadRaw: envelope.payloadRaw ?? null,
};
let omniMessageId: string;
if (envelope.providerMessageId) {
const message = await prisma.omniMessage.upsert({
where: {
threadId_providerMessageId: {
threadId,
providerMessageId: envelope.providerMessageId,
},
},
create: {
teamId,
contactId,
threadId,
direction,
channel: "TELEGRAM",
status: "DELIVERED",
text,
providerMessageId: envelope.providerMessageId,
providerUpdateId: envelope.providerEventId,
rawJson: rawEnvelope,
occurredAt,
},
update: {
text,
providerUpdateId: envelope.providerEventId,
rawJson: rawEnvelope,
occurredAt,
},
select: { id: true },
});
omniMessageId = message.id;
} else {
const message = await prisma.omniMessage.create({
data: {
teamId,
contactId,
threadId,
direction,
channel: "TELEGRAM",
status: "DELIVERED",
text,
providerMessageId: null,
providerUpdateId: envelope.providerEventId,
rawJson: rawEnvelope,
occurredAt,
},
select: { id: true },
});
omniMessageId = message.id;
}
await prisma.contactMessage.create({
data: {
contactId,
contactInboxId,
kind: "MESSAGE",
direction,
channel: "TELEGRAM",
content: text,
occurredAt,
},
});
return { ok: true, message: "inbound_ingested", omniMessageId };
}
export async function reportTelegramOutbound(input: TelegramOutboundReport) {
const statusRaw = input.status.trim().toUpperCase();
const status: OmniMessageStatus =
statusRaw === "SENT" ||
statusRaw === "FAILED" ||
statusRaw === "DELIVERED" ||
statusRaw === "READ" ||
statusRaw === "PENDING"
? (statusRaw as OmniMessageStatus)
: "FAILED";
const existing = await prisma.omniMessage.findUnique({
where: { id: input.omniMessageId },
select: { rawJson: true },
});
const raw = (existing?.rawJson && typeof existing.rawJson === "object" && !Array.isArray(existing.rawJson)
? (existing.rawJson as Record<string, unknown>)
: {}) as Record<string, unknown>;
await prisma.omniMessage.update({
where: { id: input.omniMessageId },
data: {
status,
...(input.providerMessageId ? { providerMessageId: input.providerMessageId } : {}),
rawJson: {
...raw,
telegramWorker: {
reportedAt: new Date().toISOString(),
status,
error: input.error ?? null,
response: (() => {
if (!input.responseJson) return null;
try {
return JSON.parse(input.responseJson);
} catch {
return input.responseJson;
}
})(),
},
},
},
});
return { ok: true, message: "outbound_reported" };
}
async function callTelegramBackendGraphql<T>(query: string, variables: Record<string, unknown>) {
const url = asString(process.env.TELEGRAM_BACKEND_GRAPHQL_URL);
if (!url) {
throw new Error("TELEGRAM_BACKEND_GRAPHQL_URL is required");
}
const headers: Record<string, string> = {
"content-type": "application/json",
};
const secret = asString(process.env.TELEGRAM_BACKEND_GRAPHQL_SHARED_SECRET);
if (secret) {
headers["x-graphql-secret"] = secret;
}
const response = await fetch(url, {
method: "POST",
headers,
body: JSON.stringify({ query, variables }),
});
const payload = (await response.json()) as { data?: T; errors?: Array<{ message?: string }> };
if (!response.ok || payload.errors?.length) {
const errorMessage = payload.errors?.map((e) => e.message).filter(Boolean).join("; ") || `HTTP ${response.status}`;
throw new Error(errorMessage);
}
return payload.data as T;
}
export async function requestTelegramOutbound(input: TelegramOutboundRequest) {
type Out = {
enqueueTelegramOutbound: {
ok: boolean;
message: string;
runId?: string | null;
};
};
const query = `mutation Enqueue($input: TelegramOutboundTaskInput!) {
enqueueTelegramOutbound(input: $input) {
ok
message
runId
}
}`;
const data = await callTelegramBackendGraphql<Out>(query, { input });
const result = data.enqueueTelegramOutbound;
if (!result?.ok) {
throw new Error(result?.message || "enqueue failed");
}
return { ok: true, message: "outbound_enqueued", runId: result.runId ?? null };
}
export async function syncCalendarPredueTimeline(): Promise<CalendarPredueSyncResult> {
const preDueMinutes = Math.max(1, readIntEnv("TIMELINE_EVENT_PREDUE_MINUTES", 30));
const lookbackMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKBACK_MINUTES", 180));
const lookaheadMinutes = Math.max(preDueMinutes, readIntEnv("TIMELINE_EVENT_LOOKAHEAD_MINUTES", 1440));
const lockKey = readIntEnv("TIMELINE_SCHEDULER_LOCK_KEY", 603001);
const now = new Date();
const rangeStart = new Date(now.getTime() - lookbackMinutes * 60_000);
const rangeEnd = new Date(now.getTime() + lookaheadMinutes * 60_000);
const lockRows = await prisma.$queryRaw<Array<{ locked: boolean }>>`
SELECT pg_try_advisory_lock(${lockKey}) AS locked
`;
const locked = Boolean(lockRows?.[0]?.locked);
if (!locked) {
return {
ok: true,
message: "lock_busy_skip",
now: now.toISOString(),
scanned: 0,
updated: 0,
skippedBeforeWindow: 0,
skippedLocked: true,
preDueMinutes,
lookbackMinutes,
lookaheadMinutes,
lockKey,
};
}
try {
const events = await prisma.calendarEvent.findMany({
where: {
isArchived: false,
contactId: { not: null },
startsAt: {
gte: rangeStart,
lte: rangeEnd,
},
},
orderBy: { startsAt: "asc" },
select: {
id: true,
teamId: true,
contactId: true,
startsAt: true,
},
});
let updated = 0;
let skippedBeforeWindow = 0;
for (const event of events) {
if (!event.contactId) continue;
const preDueAt = new Date(event.startsAt.getTime() - preDueMinutes * 60_000);
if (now < preDueAt) {
skippedBeforeWindow += 1;
continue;
}
await prisma.clientTimelineEntry.upsert({
where: {
teamId_contentType_contentId: {
teamId: event.teamId,
contentType: "CALENDAR_EVENT",
contentId: event.id,
},
},
create: {
teamId: event.teamId,
contactId: event.contactId,
contentType: "CALENDAR_EVENT",
contentId: event.id,
datetime: preDueAt,
},
update: {
contactId: event.contactId,
datetime: preDueAt,
},
});
updated += 1;
}
return {
ok: true,
message: "calendar_predue_synced",
now: now.toISOString(),
scanned: events.length,
updated,
skippedBeforeWindow,
skippedLocked: false,
preDueMinutes,
lookbackMinutes,
lookaheadMinutes,
lockKey,
};
} finally {
await prisma.$queryRaw`SELECT pg_advisory_unlock(${lockKey})`;
}
}