split delivery into standalone dockerized service

This commit is contained in:
Ruslan Bakiev
2026-02-20 12:25:10 +07:00
parent 46cca064df
commit cb7d81e801
15 changed files with 2090 additions and 49 deletions

5
delivery/.dockerignore Normal file
View File

@@ -0,0 +1,5 @@
node_modules
npm-debug.log
.git
.gitignore
.DS_Store

16
delivery/Dockerfile Normal file
View File

@@ -0,0 +1,16 @@
FROM node:22-bookworm-slim
WORKDIR /app/delivery
COPY package*.json ./
RUN npm ci --legacy-peer-deps
COPY prisma ./prisma
RUN npx prisma generate
COPY src ./src
COPY tsconfig.json ./tsconfig.json
ENV NODE_ENV=production
CMD ["npm", "run", "start"]

View File

@@ -1,8 +1,8 @@
services:
delivery:
build:
context: ../frontend
dockerfile: Dockerfile.worker
context: .
dockerfile: Dockerfile
environment:
DATABASE_URL: "${DATABASE_URL:-postgresql://postgres:dpb6gmj1umjhohso@crm-sql-q57r8m:5432/postgres?schema=public}"
REDIS_URL: "${REDIS_URL:-redis://default:nw0mv1pemhnbh7gw@crm-redis-vkpxku:6379}"

1358
delivery/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

20
delivery/package.json Normal file
View File

@@ -0,0 +1,20 @@
{
"name": "crm-delivery",
"private": true,
"type": "module",
"scripts": {
"db:generate": "prisma generate",
"start": "tsx src/worker.ts"
},
"dependencies": {
"@prisma/client": "^6.16.1",
"bullmq": "^5.58.2",
"ioredis": "^5.7.0"
},
"devDependencies": {
"@types/node": "^22.13.9",
"prisma": "^6.16.1",
"tsx": "^4.20.5",
"typescript": "^5.9.2"
}
}

View File

@@ -0,0 +1,392 @@
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
enum TeamRole {
OWNER
MEMBER
}
enum MessageDirection {
IN
OUT
}
enum MessageChannel {
TELEGRAM
WHATSAPP
INSTAGRAM
PHONE
EMAIL
INTERNAL
}
enum ContactMessageKind {
MESSAGE
CALL
}
enum ChatRole {
USER
ASSISTANT
SYSTEM
}
enum OmniMessageStatus {
PENDING
SENT
FAILED
DELIVERED
READ
}
enum FeedCardDecision {
PENDING
ACCEPTED
REJECTED
}
enum WorkspaceDocumentType {
Regulation
Playbook
Policy
Template
}
model Team {
id String @id @default(cuid())
name String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
members TeamMember[]
contacts Contact[]
calendarEvents CalendarEvent[]
deals Deal[]
conversations ChatConversation[]
chatMessages ChatMessage[]
omniThreads OmniThread[]
omniMessages OmniMessage[]
omniIdentities OmniContactIdentity[]
telegramBusinessConnections TelegramBusinessConnection[]
feedCards FeedCard[]
contactPins ContactPin[]
documents WorkspaceDocument[]
}
model User {
id String @id @default(cuid())
phone String @unique
passwordHash String
email String? @unique
name String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
memberships TeamMember[]
conversations ChatConversation[] @relation("ConversationCreator")
chatMessages ChatMessage[] @relation("ChatAuthor")
}
model TeamMember {
id String @id @default(cuid())
teamId String
userId String
role TeamRole @default(MEMBER)
createdAt DateTime @default(now())
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
@@unique([teamId, userId])
@@index([userId])
}
model Contact {
id String @id @default(cuid())
teamId String
name String
company String?
country String?
location String?
avatarUrl String?
email String?
phone String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
note ContactNote?
messages ContactMessage[]
events CalendarEvent[]
deals Deal[]
feedCards FeedCard[]
pins ContactPin[]
omniThreads OmniThread[]
omniMessages OmniMessage[]
omniIdentities OmniContactIdentity[]
@@index([teamId, updatedAt])
}
model ContactNote {
id String @id @default(cuid())
contactId String @unique
content String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
contact Contact @relation(fields: [contactId], references: [id], onDelete: Cascade)
}
model ContactMessage {
id String @id @default(cuid())
contactId String
kind ContactMessageKind @default(MESSAGE)
direction MessageDirection
channel MessageChannel
content String
audioUrl String?
durationSec Int?
transcriptJson Json?
occurredAt DateTime @default(now())
createdAt DateTime @default(now())
contact Contact @relation(fields: [contactId], references: [id], onDelete: Cascade)
@@index([contactId, occurredAt])
}
model OmniContactIdentity {
id String @id @default(cuid())
teamId String
contactId String
channel MessageChannel
externalId String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
contact Contact @relation(fields: [contactId], references: [id], onDelete: Cascade)
@@unique([teamId, channel, externalId])
@@index([contactId])
@@index([teamId, updatedAt])
}
model OmniThread {
id String @id @default(cuid())
teamId String
contactId String
channel MessageChannel
externalChatId String
businessConnectionId String?
title String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
contact Contact @relation(fields: [contactId], references: [id], onDelete: Cascade)
messages OmniMessage[]
@@unique([teamId, channel, externalChatId, businessConnectionId])
@@index([teamId, updatedAt])
@@index([contactId, updatedAt])
}
model OmniMessage {
id String @id @default(cuid())
teamId String
contactId String
threadId String
direction MessageDirection
channel MessageChannel
status OmniMessageStatus @default(PENDING)
text String
providerMessageId String?
providerUpdateId String?
rawJson Json?
occurredAt DateTime @default(now())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
contact Contact @relation(fields: [contactId], references: [id], onDelete: Cascade)
thread OmniThread @relation(fields: [threadId], references: [id], onDelete: Cascade)
@@unique([threadId, providerMessageId])
@@index([teamId, occurredAt])
@@index([threadId, occurredAt])
}
model TelegramBusinessConnection {
id String @id @default(cuid())
teamId String
businessConnectionId String
isEnabled Boolean?
canReply Boolean?
rawJson Json?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
@@unique([teamId, businessConnectionId])
@@index([teamId, updatedAt])
}
model CalendarEvent {
id String @id @default(cuid())
teamId String
contactId String?
title String
startsAt DateTime
endsAt DateTime?
note String?
isArchived Boolean @default(false)
archiveNote String?
archivedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
contact Contact? @relation(fields: [contactId], references: [id], onDelete: SetNull)
@@index([startsAt])
@@index([contactId, startsAt])
@@index([teamId, startsAt])
}
model Deal {
id String @id @default(cuid())
teamId String
contactId String
title String
stage String
amount Int?
nextStep String?
summary String?
currentStepId String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
contact Contact @relation(fields: [contactId], references: [id], onDelete: Cascade)
steps DealStep[]
@@index([teamId, updatedAt])
@@index([contactId, updatedAt])
@@index([currentStepId])
}
model DealStep {
id String @id @default(cuid())
dealId String
title String
description String?
status String @default("todo")
dueAt DateTime?
order Int @default(0)
completedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
deal Deal @relation(fields: [dealId], references: [id], onDelete: Cascade)
@@index([dealId, order])
@@index([status, dueAt])
}
model ChatConversation {
id String @id @default(cuid())
teamId String
createdByUserId String
title String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
createdByUser User @relation("ConversationCreator", fields: [createdByUserId], references: [id], onDelete: Cascade)
messages ChatMessage[]
@@index([teamId, updatedAt])
@@index([createdByUserId])
}
model ChatMessage {
id String @id @default(cuid())
teamId String
conversationId String
authorUserId String?
role ChatRole
text String
planJson Json?
createdAt DateTime @default(now())
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
conversation ChatConversation @relation(fields: [conversationId], references: [id], onDelete: Cascade)
authorUser User? @relation("ChatAuthor", fields: [authorUserId], references: [id], onDelete: SetNull)
@@index([createdAt])
@@index([teamId, createdAt])
@@index([conversationId, createdAt])
}
model FeedCard {
id String @id @default(cuid())
teamId String
contactId String?
happenedAt DateTime
text String
proposalJson Json
decision FeedCardDecision @default(PENDING)
decisionNote String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
contact Contact? @relation(fields: [contactId], references: [id], onDelete: SetNull)
@@index([teamId, happenedAt])
@@index([contactId, happenedAt])
}
model ContactPin {
id String @id @default(cuid())
teamId String
contactId String
text String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
contact Contact @relation(fields: [contactId], references: [id], onDelete: Cascade)
@@index([teamId, updatedAt])
@@index([contactId, updatedAt])
}
model WorkspaceDocument {
id String @id @default(cuid())
teamId String
title String
type WorkspaceDocumentType
owner String
scope String
summary String
body String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
team Team @relation(fields: [teamId], references: [id], onDelete: Cascade)
@@index([teamId, updatedAt])
}

View File

@@ -0,0 +1,210 @@
import { Queue, Worker, type Job, type JobsOptions, type ConnectionOptions } from "bullmq";
import { Prisma } from "@prisma/client";
import { prisma } from "../utils/prisma";
export const OUTBOUND_DELIVERY_QUEUE_NAME = "omni-outbound";
export type OutboundDeliveryJob = {
omniMessageId: string;
endpoint: string;
method?: "POST" | "PUT" | "PATCH";
headers?: Record<string, string>;
payload: unknown;
channel?: string;
provider?: string;
};
function redisConnectionFromEnv(): ConnectionOptions {
const raw = (process.env.REDIS_URL || "redis://localhost:6379").trim();
const parsed = new URL(raw);
return {
host: parsed.hostname,
port: parsed.port ? Number(parsed.port) : 6379,
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
db: parsed.pathname && parsed.pathname !== "/" ? Number(parsed.pathname.slice(1)) : undefined,
maxRetriesPerRequest: null,
};
}
function ensureHttpUrl(value: string) {
const raw = (value ?? "").trim();
if (!raw) throw new Error("endpoint is required");
const parsed = new URL(raw);
if (parsed.protocol !== "http:" && parsed.protocol !== "https:") {
throw new Error(`Unsupported endpoint protocol: ${parsed.protocol}`);
}
return parsed.toString();
}
function compactError(error: unknown) {
if (!error) return "unknown_error";
if (typeof error === "string") return error;
const anyErr = error as any;
return String(anyErr?.message ?? anyErr);
}
function extractProviderMessageId(body: unknown): string | null {
const obj = body as any;
if (!obj || typeof obj !== "object") return null;
const candidate =
obj?.message_id ??
obj?.messageId ??
obj?.id ??
obj?.result?.message_id ??
obj?.result?.id ??
null;
if (candidate == null) return null;
return String(candidate);
}
export function outboundDeliveryQueue() {
return new Queue<OutboundDeliveryJob, unknown, "deliver">(OUTBOUND_DELIVERY_QUEUE_NAME, {
connection: redisConnectionFromEnv(),
defaultJobOptions: {
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
},
});
}
export async function enqueueOutboundDelivery(input: OutboundDeliveryJob, opts?: JobsOptions) {
const endpoint = ensureHttpUrl(input.endpoint);
const q = outboundDeliveryQueue();
const payload = (input.payload ?? null) as Prisma.InputJsonValue;
await prisma.omniMessage.update({
where: { id: input.omniMessageId },
data: {
status: "PENDING",
rawJson: {
queue: {
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
enqueuedAt: new Date().toISOString(),
},
deliveryRequest: {
endpoint,
method: input.method ?? "POST",
channel: input.channel ?? null,
provider: input.provider ?? null,
payload,
},
},
},
});
return q.add("deliver", { ...input, endpoint }, {
jobId: `omni:${input.omniMessageId}`,
attempts: 12,
backoff: { type: "exponential", delay: 1000 },
...opts,
});
}
export function startOutboundDeliveryWorker() {
return new Worker<OutboundDeliveryJob, unknown, "deliver">(
OUTBOUND_DELIVERY_QUEUE_NAME,
async (job: Job<OutboundDeliveryJob, unknown, "deliver">) => {
const msg = await prisma.omniMessage.findUnique({
where: { id: job.data.omniMessageId },
include: { thread: true },
});
if (!msg) return;
if ((msg.status === "SENT" || msg.status === "DELIVERED" || msg.status === "READ") && msg.providerMessageId) {
return;
}
const endpoint = ensureHttpUrl(job.data.endpoint);
const method = job.data.method ?? "POST";
const headers: Record<string, string> = {
"content-type": "application/json",
...(job.data.headers ?? {}),
};
const requestPayload = (job.data.payload ?? null) as Prisma.InputJsonValue;
const requestStartedAt = new Date().toISOString();
try {
const response = await fetch(endpoint, {
method,
headers,
body: JSON.stringify(requestPayload ?? {}),
});
const text = await response.text();
const responseBody = (() => {
try {
return JSON.parse(text);
} catch {
return text;
}
})();
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${typeof responseBody === "string" ? responseBody : JSON.stringify(responseBody)}`);
}
const providerMessageId = extractProviderMessageId(responseBody);
await prisma.omniMessage.update({
where: { id: msg.id },
data: {
status: "SENT",
providerMessageId,
rawJson: {
queue: {
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
completedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1,
},
deliveryRequest: {
endpoint,
method,
channel: job.data.channel ?? null,
provider: job.data.provider ?? null,
startedAt: requestStartedAt,
payload: requestPayload,
},
deliveryResponse: {
status: response.status,
body: responseBody,
},
},
},
});
} catch (error) {
const isLastAttempt =
typeof job.opts.attempts === "number" && job.attemptsMade + 1 >= job.opts.attempts;
if (isLastAttempt) {
await prisma.omniMessage.update({
where: { id: msg.id },
data: {
status: "FAILED",
rawJson: {
queue: {
queueName: OUTBOUND_DELIVERY_QUEUE_NAME,
failedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade + 1,
},
deliveryRequest: {
endpoint,
method,
channel: job.data.channel ?? null,
provider: job.data.provider ?? null,
startedAt: requestStartedAt,
payload: requestPayload,
},
deliveryError: {
message: compactError(error),
},
},
},
});
}
throw error;
}
},
{ connection: redisConnectionFromEnv() },
);
}

View File

@@ -0,0 +1,16 @@
import { PrismaClient } from "@prisma/client";
declare global {
// eslint-disable-next-line no-var
var __prisma: PrismaClient | undefined;
}
export const prisma =
globalThis.__prisma ??
new PrismaClient({
log: ["error", "warn"],
});
if (process.env.NODE_ENV !== "production") {
globalThis.__prisma = prisma;
}

View File

@@ -0,0 +1,21 @@
import Redis, { type Redis as RedisClient } from "ioredis";
declare global {
// eslint-disable-next-line no-var
var __redis: RedisClient | undefined;
}
export function getRedis() {
if (globalThis.__redis) return globalThis.__redis;
const url = process.env.REDIS_URL || "redis://localhost:6379";
const client = new Redis(url, {
maxRetriesPerRequest: null,
});
if (process.env.NODE_ENV !== "production") {
globalThis.__redis = client;
}
return client;
}

34
delivery/src/worker.ts Normal file
View File

@@ -0,0 +1,34 @@
import { startOutboundDeliveryWorker } from "./queues/outboundDelivery";
import { prisma } from "./utils/prisma";
import { getRedis } from "./utils/redis";
const worker = startOutboundDeliveryWorker();
console.log("[delivery-worker] started queue omni:outbound");
async function shutdown(signal: string) {
console.log(`[delivery-worker] shutting down by ${signal}`);
try {
await worker.close();
} catch {
// ignore shutdown errors
}
try {
const redis = getRedis();
await redis.quit();
} catch {
// ignore shutdown errors
}
try {
await prisma.$disconnect();
} catch {
// ignore shutdown errors
}
process.exit(0);
}
process.on("SIGINT", () => {
void shutdown("SIGINT");
});
process.on("SIGTERM", () => {
void shutdown("SIGTERM");
});

14
delivery/tsconfig.json Normal file
View File

@@ -0,0 +1,14 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "Bundler",
"strict": true,
"skipLibCheck": true,
"esModuleInterop": true,
"types": ["node"],
"resolveJsonModule": true,
"verbatimModuleSyntax": true
},
"include": ["src/**/*.ts"]
}

View File

@@ -45,8 +45,8 @@ services:
delivery:
build:
context: ./frontend
dockerfile: Dockerfile.worker
context: ./delivery
dockerfile: Dockerfile
environment:
DATABASE_URL: "${DATABASE_URL:-postgresql://postgres:dpb6gmj1umjhohso@crm-sql-q57r8m:5432/postgres?schema=public}"
REDIS_URL: "${REDIS_URL:-redis://default:nw0mv1pemhnbh7gw@crm-redis-vkpxku:6379}"

View File

@@ -1,15 +0,0 @@
FROM node:22-bookworm-slim
WORKDIR /app/delivery
COPY package*.json ./
# Worker does not need Nuxt postinstall hooks.
RUN npm install --ignore-scripts --legacy-peer-deps
COPY . .
RUN npx prisma generate
ENV NODE_ENV=production
CMD ["npm", "run", "worker:delivery"]

View File

@@ -14,7 +14,6 @@
"postinstall": "nuxt prepare && prisma generate",
"preview": "nuxt preview",
"typecheck": "nuxt typecheck",
"worker:delivery": "tsx server/queues/worker.ts",
"codegen": "graphql-codegen --config codegen.ts",
"storybook": "storybook dev -p 6006",
"storybook:build": "storybook build"

View File

@@ -1,29 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
cd "$(dirname "$0")/.."
# Worker container starts from clean image.
# Install deps without frontend postinstall hooks (nuxt prepare) to keep worker lean/stable.
npm install --ignore-scripts --legacy-peer-deps
ARCH="$(uname -m)"
if [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "arm64" ]; then
npm rebuild sharp --platform=linux --arch=arm64v8 \
|| npm rebuild sharp --platform=linux --arch=arm64 \
|| npm install sharp --platform=linux --arch=arm64v8 --save-exact=false \
|| npm install sharp --platform=linux --arch=arm64 --save-exact=false
elif [ "$ARCH" = "x86_64" ] || [ "$ARCH" = "amd64" ]; then
npm rebuild sharp --platform=linux --arch=x64 \
|| npm install sharp --platform=linux --arch=x64 --save-exact=false
else
npm rebuild sharp || true
fi
npx prisma generate
# Ensure DB is reachable before the worker starts consuming jobs.
until node -e "const u=new URL(process.env.DATABASE_URL||''); const net=require('net'); const s=net.createConnection({host:u.hostname,port:Number(u.port||5432)}); s.on('connect',()=>{s.end(); process.exit(0);}); s.on('error',()=>process.exit(1));" ; do
echo "Waiting for PostgreSQL..."
sleep 1
done
exec npm run worker:delivery