import asyncio import logging import os import uuid from dataclasses import asdict from typing import Tuple from temporalio.client import Client from .models import Team logger = logging.getLogger(__name__) # Default Temporal connection settings; override via env. TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "temporal:7233") TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default") TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "platform-worker") async def _start_team_created_async(team: Team) -> Tuple[str, str]: """ Start the team_created workflow in Temporal and return (workflow_id, run_id). """ client = await Client.connect(TEMPORAL_ADDRESS, namespace=TEMPORAL_NAMESPACE) # We re-use team.uuid as workflow_id to keep idempotency. handle = await client.start_workflow( "team_created_workflow", # workflow name registered in worker { "team_id": team.uuid, "team_name": team.name, "owner_id": getattr(getattr(team.owner, "profile", None), "logto_id", "") or getattr(team.owner, "username", ""), "logto_org_id": team.logto_org_id or "", }, id=team.uuid, task_queue=TEMPORAL_TASK_QUEUE, ) return handle.id, handle.run_id def start_team_created(team: Team) -> Tuple[str, str]: """ Sync wrapper for Django mutation handlers. """ try: return asyncio.run(_start_team_created_async(team)) except Exception: logger.exception("Failed to start Temporal workflow for team %s", team.uuid) raise async def _start_address_workflow_async( team_uuid: str, name: str, address: str, latitude: float | None = None, longitude: float | None = None, country_code: str | None = None, is_default: bool = False, ) -> Tuple[str, str]: """ Start the create_address workflow in Temporal. Returns (workflow_id, run_id). """ client = await Client.connect(TEMPORAL_ADDRESS, namespace=TEMPORAL_NAMESPACE) workflow_id = f"address-{uuid.uuid4()}" handle = await client.start_workflow( "create_address", { "workflow_id": workflow_id, "team_uuid": team_uuid, "name": name, "address": address, "latitude": latitude, "longitude": longitude, "country_code": country_code, "is_default": is_default, }, id=workflow_id, task_queue=TEMPORAL_TASK_QUEUE, ) logger.info("Started address workflow %s for team %s", workflow_id, team_uuid) return handle.id, handle.result_run_id def start_address_workflow( team_uuid: str, name: str, address: str, latitude: float | None = None, longitude: float | None = None, country_code: str | None = None, is_default: bool = False, ) -> Tuple[str, str]: """ Sync wrapper for starting address workflow. """ try: return asyncio.run(_start_address_workflow_async( team_uuid=team_uuid, name=name, address=address, latitude=latitude, longitude=longitude, country_code=country_code, is_default=is_default, )) except Exception: logger.exception("Failed to start address workflow for team %s", team_uuid) raise async def _start_invite_workflow_async( team_uuid: str, email: str, role: str, invited_by: str, expires_at: str, ) -> Tuple[str, str]: """ Start the invite_user workflow in Temporal. Returns (workflow_id, run_id). """ client = await Client.connect(TEMPORAL_ADDRESS, namespace=TEMPORAL_NAMESPACE) workflow_id = f"invite-{uuid.uuid4()}" handle = await client.start_workflow( "invite_user", { "team_uuid": team_uuid, "email": email, "role": role, "invited_by": invited_by, "expires_at": expires_at, }, id=workflow_id, task_queue=TEMPORAL_TASK_QUEUE, ) logger.info("Started invite workflow %s for %s", workflow_id, email) return handle.id, handle.result_run_id def start_invite_workflow( team_uuid: str, email: str, role: str, invited_by: str, expires_at: str, ) -> Tuple[str, str]: """ Sync wrapper for starting invite workflow. """ try: return asyncio.run(_start_invite_workflow_async( team_uuid=team_uuid, email=email, role=role, invited_by=invited_by, expires_at=expires_at, )) except Exception: logger.exception("Failed to start invite workflow for %s", email) raise