169 lines
4.6 KiB
Python
169 lines
4.6 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import uuid
|
|
from dataclasses import asdict
|
|
from typing import Tuple
|
|
|
|
from temporalio.client import Client
|
|
|
|
from .models import Team
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default Temporal connection settings; override via env.
|
|
TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "temporal:7233")
|
|
TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default")
|
|
TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "platform-worker")
|
|
|
|
|
|
async def _start_team_created_async(team: Team) -> Tuple[str, str]:
|
|
"""
|
|
Start the team_created workflow in Temporal and return (workflow_id, run_id).
|
|
"""
|
|
client = await Client.connect(TEMPORAL_ADDRESS, namespace=TEMPORAL_NAMESPACE)
|
|
|
|
# We re-use team.uuid as workflow_id to keep idempotency.
|
|
handle = await client.start_workflow(
|
|
"team_created_workflow", # workflow name registered in worker
|
|
{
|
|
"team_id": team.uuid,
|
|
"team_name": team.name,
|
|
"owner_id": getattr(getattr(team.owner, "profile", None), "logto_id", "") or getattr(team.owner, "username", ""),
|
|
"logto_org_id": team.logto_org_id or "",
|
|
},
|
|
id=team.uuid,
|
|
task_queue=TEMPORAL_TASK_QUEUE,
|
|
)
|
|
return handle.id, handle.run_id
|
|
|
|
|
|
def start_team_created(team: Team) -> Tuple[str, str]:
|
|
"""
|
|
Sync wrapper for Django mutation handlers.
|
|
"""
|
|
try:
|
|
return asyncio.run(_start_team_created_async(team))
|
|
except Exception:
|
|
logger.exception("Failed to start Temporal workflow for team %s", team.uuid)
|
|
raise
|
|
|
|
|
|
async def _start_address_workflow_async(
|
|
team_uuid: str,
|
|
name: str,
|
|
address: str,
|
|
latitude: float | None = None,
|
|
longitude: float | None = None,
|
|
country_code: str | None = None,
|
|
is_default: bool = False,
|
|
) -> Tuple[str, str]:
|
|
"""
|
|
Start the create_address workflow in Temporal.
|
|
Returns (workflow_id, run_id).
|
|
"""
|
|
client = await Client.connect(TEMPORAL_ADDRESS, namespace=TEMPORAL_NAMESPACE)
|
|
|
|
workflow_id = f"address-{uuid.uuid4()}"
|
|
|
|
handle = await client.start_workflow(
|
|
"create_address",
|
|
{
|
|
"workflow_id": workflow_id,
|
|
"team_uuid": team_uuid,
|
|
"name": name,
|
|
"address": address,
|
|
"latitude": latitude,
|
|
"longitude": longitude,
|
|
"country_code": country_code,
|
|
"is_default": is_default,
|
|
},
|
|
id=workflow_id,
|
|
task_queue=TEMPORAL_TASK_QUEUE,
|
|
)
|
|
|
|
logger.info("Started address workflow %s for team %s", workflow_id, team_uuid)
|
|
return handle.id, handle.result_run_id
|
|
|
|
|
|
def start_address_workflow(
|
|
team_uuid: str,
|
|
name: str,
|
|
address: str,
|
|
latitude: float | None = None,
|
|
longitude: float | None = None,
|
|
country_code: str | None = None,
|
|
is_default: bool = False,
|
|
) -> Tuple[str, str]:
|
|
"""
|
|
Sync wrapper for starting address workflow.
|
|
"""
|
|
try:
|
|
return asyncio.run(_start_address_workflow_async(
|
|
team_uuid=team_uuid,
|
|
name=name,
|
|
address=address,
|
|
latitude=latitude,
|
|
longitude=longitude,
|
|
country_code=country_code,
|
|
is_default=is_default,
|
|
))
|
|
except Exception:
|
|
logger.exception("Failed to start address workflow for team %s", team_uuid)
|
|
raise
|
|
|
|
|
|
async def _start_invite_workflow_async(
|
|
team_uuid: str,
|
|
email: str,
|
|
role: str,
|
|
invited_by: str,
|
|
expires_at: str,
|
|
) -> Tuple[str, str]:
|
|
"""
|
|
Start the invite_user workflow in Temporal.
|
|
Returns (workflow_id, run_id).
|
|
"""
|
|
client = await Client.connect(TEMPORAL_ADDRESS, namespace=TEMPORAL_NAMESPACE)
|
|
|
|
workflow_id = f"invite-{uuid.uuid4()}"
|
|
|
|
handle = await client.start_workflow(
|
|
"invite_user",
|
|
{
|
|
"team_uuid": team_uuid,
|
|
"email": email,
|
|
"role": role,
|
|
"invited_by": invited_by,
|
|
"expires_at": expires_at,
|
|
},
|
|
id=workflow_id,
|
|
task_queue=TEMPORAL_TASK_QUEUE,
|
|
)
|
|
|
|
logger.info("Started invite workflow %s for %s", workflow_id, email)
|
|
return handle.id, handle.result_run_id
|
|
|
|
|
|
def start_invite_workflow(
|
|
team_uuid: str,
|
|
email: str,
|
|
role: str,
|
|
invited_by: str,
|
|
expires_at: str,
|
|
) -> Tuple[str, str]:
|
|
"""
|
|
Sync wrapper for starting invite workflow.
|
|
"""
|
|
try:
|
|
return asyncio.run(_start_invite_workflow_async(
|
|
team_uuid=team_uuid,
|
|
email=email,
|
|
role=role,
|
|
invited_by=invited_by,
|
|
expires_at=expires_at,
|
|
))
|
|
except Exception:
|
|
logger.exception("Failed to start invite workflow for %s", email)
|
|
raise
|