Files
kyc/kyc_app/schemas/user_schema.py
Ruslan Bakiev 91fb2ec0dc
All checks were successful
Build Docker Image / build (push) Successful in 3m7s
Rename KYC models (Application/Profile) and add public schema with MongoDB
2026-01-21 09:19:37 +07:00

135 lines
4.7 KiB
Python

import graphene
from graphene_django import DjangoObjectType
from django.contrib.contenttypes.models import ContentType
from ..models import KYCApplication, KYCDetailsRussia
from ..temporal import KycWorkflowClient
class KYCApplicationType(DjangoObjectType):
"""GraphQL type for KYC Application (заявка)."""
class Meta:
model = KYCApplication
fields = '__all__'
country_data = graphene.JSONString()
workflow_status = graphene.String()
def resolve_country_data(self, info):
return self.get_country_data()
class KYCDetailsRussiaType(DjangoObjectType):
"""GraphQL type for Russia-specific KYC details."""
class Meta:
model = KYCDetailsRussia
fields = '__all__'
class KYCApplicationRussiaInput(graphene.InputObjectType):
"""Input for creating KYC Application for Russia."""
companyName = graphene.String(required=True)
companyFullName = graphene.String(required=True)
inn = graphene.String(required=True)
kpp = graphene.String()
ogrn = graphene.String()
address = graphene.String(required=True)
bankName = graphene.String(required=True)
bik = graphene.String(required=True)
correspondentAccount = graphene.String()
contactPerson = graphene.String(required=True)
contactEmail = graphene.String(required=True)
contactPhone = graphene.String(required=True)
class CreateKYCApplicationRussia(graphene.Mutation):
"""Create KYC Application for Russian company."""
class Arguments:
input = KYCApplicationRussiaInput(required=True)
kyc_application = graphene.Field(KYCApplicationType)
success = graphene.Boolean()
def mutate(self, info, input):
# Get user_id from JWT token
user_id = getattr(info.context, 'user_id', None)
if not user_id:
raise Exception("Not authenticated")
# 1. Create Russia details
russia_details = KYCDetailsRussia.objects.create(
company_name=input.companyName,
company_full_name=input.companyFullName,
inn=input.inn,
kpp=input.kpp or '',
ogrn=input.ogrn or '',
address=input.address,
bank_name=input.bankName,
bik=input.bik,
correspondent_account=input.correspondentAccount or '',
)
# 2. Create main KYC Application with reference to details
kyc_application = KYCApplication.objects.create(
user_id=user_id,
team_name=input.companyName,
country_code='RU',
contact_person=input.contactPerson,
contact_email=input.contactEmail,
contact_phone=input.contactPhone,
content_type=ContentType.objects.get_for_model(KYCDetailsRussia),
object_id=russia_details.id,
)
# 3. Start Temporal workflow
KycWorkflowClient.start(kyc_application)
return CreateKYCApplicationRussia(kyc_application=kyc_application, success=True)
class UserQuery(graphene.ObjectType):
"""User schema - ID token authentication"""
# Keep old names for backwards compatibility
kyc_requests = graphene.List(KYCApplicationType, description="Get user's KYC applications")
kyc_request = graphene.Field(KYCApplicationType, uuid=graphene.String(required=True))
# New names
kyc_applications = graphene.List(KYCApplicationType, description="Get user's KYC applications")
kyc_application = graphene.Field(KYCApplicationType, uuid=graphene.String(required=True))
def resolve_kyc_requests(self, info):
return self._get_applications(info)
def resolve_kyc_applications(self, info):
return self._get_applications(info)
def _get_applications(self, info):
user_id = getattr(info.context, 'user_id', None)
if not user_id:
return []
return KYCApplication.objects.filter(user_id=user_id)
def resolve_kyc_request(self, info, uuid):
return self._get_application(info, uuid)
def resolve_kyc_application(self, info, uuid):
return self._get_application(info, uuid)
def _get_application(self, info, uuid):
user_id = getattr(info.context, 'user_id', None)
if not user_id:
return None
try:
return KYCApplication.objects.get(uuid=uuid, user_id=user_id)
except KYCApplication.DoesNotExist:
return None
class UserMutation(graphene.ObjectType):
"""User mutations - ID token authentication"""
# Keep old name for backwards compatibility
create_kyc_request_russia = CreateKYCApplicationRussia.Field()
# New name
create_kyc_application_russia = CreateKYCApplicationRussia.Field()
user_schema = graphene.Schema(query=UserQuery, mutation=UserMutation)