From 0f87586e81de63813b2a0367cdfe76bea5289e3c Mon Sep 17 00:00:00 2001 From: Ruslan Bakiev <572431+veikab@users.noreply.github.com> Date: Wed, 25 Feb 2026 14:53:55 +0700 Subject: [PATCH] fix: OUT messages no longer create unread status + handle Telegram read receipts Only inbound (IN) messages determine hasUnread in getContacts(). Telegram read_business_message events are now parsed and processed to auto-mark contacts as read for the entire team. Co-Authored-By: Claude Opus 4.6 --- frontend/server/graphql/schema.ts | 2 +- omni_chat/src/worker.ts | 37 +++++++++++++++++++++++++++++++ omni_inbound/src/telegram.ts | 36 +++++++++++++++++++++++++++++- 3 files changed, 73 insertions(+), 2 deletions(-) diff --git a/frontend/server/graphql/schema.ts b/frontend/server/graphql/schema.ts index 7aebd68..38be195 100644 --- a/frontend/server/graphql/schema.ts +++ b/frontend/server/graphql/schema.ts @@ -426,7 +426,7 @@ async function getContacts(auth: AuthContext | null) { include: { note: { select: { content: true } }, messages: { - ...(messageWhere ? { where: messageWhere } : {}), + where: { direction: "IN", ...(messageWhere ?? {}) }, select: { content: true, channel: true, occurredAt: true }, orderBy: { occurredAt: "desc" as const }, take: 1, diff --git a/omni_chat/src/worker.ts b/omni_chat/src/worker.ts index 795e232..001f429 100644 --- a/omni_chat/src/worker.ts +++ b/omni_chat/src/worker.ts @@ -445,9 +445,46 @@ async function upsertContactInbox(input: { }); } +async function handleReadBusinessMessage(env: OmniInboundEnvelopeV1) { + const teamId = await resolveTeamId(env); + if (!teamId) return; + + const n = env.payloadNormalized ?? ({} as OmniInboundEnvelopeV1["payloadNormalized"]); + const externalChatId = String(n.threadExternalId ?? n.contactExternalId ?? "").trim(); + if (!externalChatId) return; + + const thread = await prisma.omniThread.findFirst({ + where: { teamId, channel: "TELEGRAM", externalChatId }, + select: { contactId: true }, + }); + if (!thread) return; + + const teamUsers = await prisma.teamMember.findMany({ + where: { teamId }, + select: { userId: true }, + }); + const now = new Date(); + // ContactThreadRead is not in omni_chat's Prisma schema, use raw upsert + await Promise.all( + teamUsers.map((u) => + prisma.$executeRaw` + INSERT INTO "ContactThreadRead" ("id", "teamId", "userId", "contactId", "readAt") + VALUES (gen_random_uuid(), ${teamId}, ${u.userId}, ${thread.contactId}, ${now}) + ON CONFLICT ("userId", "contactId") DO UPDATE SET "readAt" = ${now} + `, + ), + ); + console.log(`[omni_chat] read_business_message: marked contact ${thread.contactId} as read for ${teamUsers.length} users`); +} + async function ingestInbound(env: OmniInboundEnvelopeV1) { if (env.channel !== "TELEGRAM") return; + if (env.eventType === "read_business_message") { + await handleReadBusinessMessage(env); + return; + } + const teamId = await resolveTeamId(env); if (!teamId) { console.warn("[omni_chat] skip inbound: team not resolved", env.providerEventId); diff --git a/omni_inbound/src/telegram.ts b/omni_inbound/src/telegram.ts index 4b8b0d3..d1ad734 100644 --- a/omni_inbound/src/telegram.ts +++ b/omni_inbound/src/telegram.ts @@ -30,6 +30,7 @@ function pickEventType(update: JsonObject): string { if (update.edited_business_message) return "edited_business_message"; if (update.business_connection) return "business_connection"; if (update.deleted_business_messages) return "deleted_business_messages"; + if (update.read_business_message) return "read_business_message"; if (update.message) return "message"; if (update.edited_message) return "edited_message"; return "unknown"; @@ -148,9 +149,42 @@ function makeFallbackEventId(raw: unknown) { export function parseTelegramBusinessUpdate(raw: unknown): OmniInboundEnvelopeV1 { const update = asObject(raw); - const message = pickMessage(update); const receivedAt = new Date().toISOString(); + // Handle read_business_message separately — different payload structure + const readEvent = asObject(update.read_business_message); + if (Object.keys(readEvent).length > 0) { + const readChat = asObject(readEvent.chat); + const threadExternalId = normalizeId(readChat.id); + const businessConnectionId = normalizeString(readEvent.business_connection_id); + const updateId = update.update_id; + const providerEventId = + (updateId != null && requireString(updateId, "")) || makeFallbackEventId(raw); + const occurredAt = isoFromUnix(readEvent.date) ?? receivedAt; + + return { + version: 1, + idempotencyKey: ["telegram_business", providerEventId, "read"].join(":"), + provider: "telegram_business", + channel: "TELEGRAM", + direction: "IN", + providerEventId, + providerMessageId: null, + eventType: "read_business_message", + occurredAt, + receivedAt, + payloadRaw: raw, + payloadNormalized: { + threadExternalId, + contactExternalId: threadExternalId, + text: null, + businessConnectionId, + }, + }; + } + + const message = pickMessage(update); + const updateId = update.update_id; const messageId = message.message_id; const businessConnection = asObject(update.business_connection);