Files
kyc/kyc_app/temporal/kyc_workflow_client.py
2026-01-07 09:16:05 +07:00

118 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
KYC Workflow Client - контракт взаимодействия с Temporal KYC workflow.
Этот файл содержит методы для запуска KYC workflow из Django.
Approve/reject сигналы отправляются из Odoo.
"""
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Optional
from django.conf import settings
from temporalio.client import Client
logger = logging.getLogger(__name__)
@dataclass
class KycWorkflowData:
"""Данные для запуска KYC workflow."""
kyc_request_id: str
team_name: str
owner_id: str
owner_email: str
country_code: str
country_data: dict = field(default_factory=dict)
# team_id создаётся в workflow после approve, не передаётся сюда
class KycWorkflowClient:
"""
Клиент для запуска KYC Application workflow.
Использование:
KycWorkflowClient.start(kyc_request)
Flow:
1. Django вызывает start() → запускает workflow
2. Workflow добавляет KYC в Odoo, ставит статус KYC_IN_REVIEW
3. Workflow ждёт сигнала approve/reject (из Odoo)
4. После approve → создаёт Logto org, ставит ACTIVE
"""
WORKFLOW_NAME = "kyc_application"
@classmethod
async def _get_client(cls) -> Client:
"""Получить подключение к Temporal."""
return await Client.connect(
settings.TEMPORAL_HOST,
namespace=settings.TEMPORAL_NAMESPACE,
)
@classmethod
async def start_async(cls, data: KycWorkflowData) -> str:
"""
Запустить KYC Application workflow (async).
Returns:
workflow_id: ID запущенного workflow
"""
client = await cls._get_client()
workflow_id = data.kyc_request_id
await client.start_workflow(
cls.WORKFLOW_NAME,
{
"kyc_request_id": data.kyc_request_id,
"team_name": data.team_name,
"owner_id": data.owner_id,
"owner_email": data.owner_email,
"country_code": data.country_code,
"country_data": data.country_data,
},
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
)
logger.info(f"KYC workflow started: {workflow_id}")
return workflow_id
@classmethod
def start(cls, kyc_request) -> Optional[str]:
"""
Запустить KYC Application workflow (sync wrapper).
Args:
kyc_request: KYCRequest model instance (главная модель)
Returns:
workflow_id или None при ошибке
"""
# Собираем данные страны через GenericForeignKey
country_data = kyc_request.get_country_data()
data = KycWorkflowData(
kyc_request_id=str(kyc_request.uuid),
team_name=kyc_request.team_name or country_data.get('company_name', ''),
owner_id=kyc_request.user_id,
owner_email=kyc_request.contact_email,
country_code=kyc_request.country_code,
country_data=country_data,
)
try:
workflow_id = asyncio.run(cls.start_async(data))
kyc_request.workflow_status = "active"
kyc_request.save(update_fields=["workflow_status", "updated_at"])
return workflow_id
except Exception as e:
logger.error(f"Failed to start KYC workflow: {e}")
kyc_request.workflow_status = "error"
kyc_request.save(update_fields=["workflow_status", "updated_at"])
return None