Files
optovia/prefect/flows/sync_offer.py
2026-01-16 10:51:08 +07:00

179 lines
5.3 KiB
Python

"""
Sync offer node to Optovia Maps graph.
- Upsert offer node
- Create edge offer -> location
- Signal Temporal workflow on completion
"""
import asyncio
import os
from dataclasses import dataclass
from datetime import datetime
from arango import ArangoClient
from prefect import flow, task, get_run_logger
from temporalio.client import Client as TemporalClient
ARANGODB_URL = os.environ.get("ARANGODB_URL", "http://217.216.32.39:32844")
ARANGODB_DATABASE = os.environ.get("ARANGODB_DATABASE", "optovia_maps")
ARANGODB_PASSWORD = os.environ.get("ARANGODB_PASSWORD", "")
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "temporal:7233")
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
@dataclass
class PrefectSyncCompletedSignal:
"""Signal data when Prefect flow completes."""
success: bool
flow_run_id: str = ""
message: str = ""
def get_db():
client = ArangoClient(hosts=ARANGODB_URL)
return client.db(ARANGODB_DATABASE, username="root", password=ARANGODB_PASSWORD)
@task
def upsert_offer_node(offer_data: dict) -> str:
logger = get_run_logger()
db = get_db()
if not db.has_collection("nodes"):
db.create_collection("nodes")
nodes = db.collection("nodes")
offer_uuid = offer_data["uuid"]
doc = {
"_key": offer_uuid,
"name": offer_data.get("name") or f"Offer {offer_uuid}",
"latitude": offer_data.get("location_latitude"),
"longitude": offer_data.get("location_longitude"),
"country": offer_data.get("location_country"),
"country_code": offer_data.get("location_country_code"),
"transport_types": [],
"node_type": "offer",
"supplier_uuid": offer_data.get("supplier_uuid"),
"team_uuid": offer_data.get("team_uuid"),
"product_uuid": offer_data.get("product_uuid"),
"price_per_unit": offer_data.get("price_per_unit"),
"currency": offer_data.get("currency"),
"quantity": offer_data.get("quantity"),
"unit": offer_data.get("unit"),
"synced_at": datetime.utcnow().isoformat(),
}
if nodes.has(offer_uuid):
nodes.update(doc)
logger.info("Updated offer node %s", offer_uuid)
else:
nodes.insert(doc)
logger.info("Inserted offer node %s", offer_uuid)
return offer_uuid
@task(retries=3, retry_delay_seconds=5)
def signal_temporal_completion(
workflow_id: str,
success: bool,
flow_run_id: str,
message: str = "",
) -> bool:
"""Signal Temporal workflow that Prefect flow completed."""
logger = get_run_logger()
async def _signal():
client = await TemporalClient.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
handle = client.get_workflow_handle(workflow_id)
await handle.signal(
"prefect_sync_completed",
PrefectSyncCompletedSignal(
success=success,
flow_run_id=flow_run_id,
message=message,
),
)
try:
asyncio.run(_signal())
logger.info("Signaled Temporal workflow %s: success=%s", workflow_id, success)
return True
except Exception as e:
logger.error("Failed to signal Temporal workflow %s: %s", workflow_id, e)
raise
@task
def upsert_offer_edge(offer_uuid: str, location_uuid: str) -> bool:
logger = get_run_logger()
db = get_db()
if not db.has_collection("edges"):
db.create_collection("edges", edge=True)
edges = db.collection("edges")
edge_key = f"{offer_uuid}_{location_uuid}_offer"
doc = {
"_key": edge_key,
"_from": f"nodes/{offer_uuid}",
"_to": f"nodes/{location_uuid}",
"type": "offer",
"transport_type": "offer",
"distance_km": 0,
"travel_time_seconds": 0,
"source": "offer",
"priority": 1,
"calculated_at": datetime.utcnow().isoformat(),
}
if edges.has(edge_key):
edges.update(doc)
logger.info("Updated offer edge %s -> %s", offer_uuid, location_uuid)
else:
edges.insert(doc)
logger.info("Created offer edge %s -> %s", offer_uuid, location_uuid)
return True
@flow(name="sync-offer", log_prints=True)
def sync_offer(offer_data: dict):
"""Sync offer to graph database and signal Temporal workflow."""
from prefect.context import get_run_context
logger = get_run_logger()
offer_uuid = offer_data.get("uuid")
location_uuid = offer_data.get("location_uuid")
workflow_id = offer_data.get("workflow_id")
if not offer_uuid or not location_uuid:
raise ValueError("offer_data must include uuid and location_uuid")
logger.info("Syncing offer %s to graph", offer_uuid)
success = True
message = ""
try:
upsert_offer_node(offer_data)
upsert_offer_edge(offer_uuid, location_uuid)
logger.info("Offer %s synced", offer_uuid)
except Exception as e:
success = False
message = str(e)
logger.error("Failed to sync offer %s: %s", offer_uuid, e)
# Signal Temporal workflow if workflow_id provided
if workflow_id:
ctx = get_run_context()
flow_run_id = str(ctx.flow_run.id) if ctx.flow_run else ""
signal_temporal_completion(workflow_id, success, flow_run_id, message)
if not success:
raise RuntimeError(f"Offer sync failed: {message}")
return {"offer_uuid": offer_uuid, "status": "ok"}