Files
optovia/prefect/flows/sync_offer.py
2026-02-04 17:22:45 +07:00

295 lines
9.2 KiB
Python

"""
Sync offer node to Optovia Maps graph.
- Upsert offer node
- Upsert supplier node (if supplier_uuid provided)
- Create edge offer -> location
- Signal Temporal workflow on completion
"""
import asyncio
import os
from dataclasses import dataclass
from datetime import datetime
import httpx
from arango import ArangoClient
from prefect import flow, task, get_run_logger
from temporalio.client import Client as TemporalClient
ARANGODB_URL = os.environ.get("ARANGODB_URL", "http://217.216.32.39:32844")
ARANGODB_DATABASE = os.environ.get("ARANGODB_DATABASE", "optovia_maps")
ARANGODB_PASSWORD = os.environ.get("ARANGODB_PASSWORD", "")
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "temporal:7233")
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
EXCHANGE_GRAPHQL_URL = os.environ.get("EXCHANGE_GRAPHQL_URL", "http://exchange:8000/graphql/public/")
@dataclass
class PrefectSyncCompletedSignal:
"""Signal data when Prefect flow completes."""
success: bool
flow_run_id: str = ""
message: str = ""
def get_db():
client = ArangoClient(hosts=ARANGODB_URL)
return client.db(ARANGODB_DATABASE, username="root", password=ARANGODB_PASSWORD)
@task
def upsert_offer_node(offer_data: dict) -> str:
logger = get_run_logger()
db = get_db()
if not db.has_collection("nodes"):
db.create_collection("nodes")
nodes = db.collection("nodes")
offer_uuid = offer_data["uuid"]
doc = {
"_key": offer_uuid,
"name": offer_data.get("name") or f"Offer {offer_uuid}",
"latitude": offer_data.get("location_latitude"),
"longitude": offer_data.get("location_longitude"),
"country": offer_data.get("location_country"),
"country_code": offer_data.get("location_country_code"),
"transport_types": [],
"node_type": "offer",
"supplier_uuid": offer_data.get("supplier_uuid"),
"team_uuid": offer_data.get("team_uuid"),
"product_uuid": offer_data.get("product_uuid"),
"product_name": offer_data.get("product_name"),
"price_per_unit": offer_data.get("price_per_unit"),
"currency": offer_data.get("currency"),
"quantity": offer_data.get("quantity"),
"unit": offer_data.get("unit"),
"synced_at": datetime.utcnow().isoformat(),
}
if nodes.has(offer_uuid):
nodes.update(doc)
logger.info("Updated offer node %s", offer_uuid)
else:
nodes.insert(doc)
logger.info("Inserted offer node %s", offer_uuid)
return offer_uuid
@task(retries=3, retry_delay_seconds=5)
def signal_temporal_completion(
workflow_id: str,
success: bool,
flow_run_id: str,
message: str = "",
) -> bool:
"""Signal Temporal workflow that Prefect flow completed."""
logger = get_run_logger()
async def _signal():
client = await TemporalClient.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
handle = client.get_workflow_handle(workflow_id)
await handle.signal(
"prefect_sync_completed",
PrefectSyncCompletedSignal(
success=success,
flow_run_id=flow_run_id,
message=message,
),
)
try:
asyncio.run(_signal())
logger.info("Signaled Temporal workflow %s: success=%s", workflow_id, success)
return True
except Exception as e:
logger.error("Failed to signal Temporal workflow %s: %s", workflow_id, e)
raise
@task
def fetch_supplier_from_exchange(supplier_uuid: str | None, team_uuid: str | None) -> dict | None:
"""Fetch supplier profile from Exchange GraphQL."""
logger = get_run_logger()
if not supplier_uuid and not team_uuid:
logger.warning("Supplier UUID and team UUID are missing")
return None
if supplier_uuid:
query = """
query GetSupplier($uuid: String!) {
getSupplierProfile(uuid: $uuid) {
uuid
teamUuid
name
country
countryCode
latitude
longitude
isVerified
}
}
"""
variables = {"uuid": supplier_uuid}
response_key = "getSupplierProfile"
else:
query = """
query GetSupplierByTeam($teamUuid: String!) {
getSupplierProfileByTeam(teamUuid: $teamUuid) {
uuid
teamUuid
name
country
countryCode
latitude
longitude
isVerified
}
}
"""
variables = {"teamUuid": team_uuid}
response_key = "getSupplierProfileByTeam"
try:
with httpx.Client(timeout=10) as client:
response = client.post(EXCHANGE_GRAPHQL_URL, json={"query": query, "variables": variables})
if response.status_code == 200:
data = response.json()
supplier = data.get("data", {}).get(response_key)
if supplier:
logger.info("Fetched supplier %s from Exchange", supplier.get("uuid"))
return supplier
logger.warning("Supplier not found in Exchange")
else:
logger.error("Failed to fetch supplier: %s", response.status_code)
except Exception as e:
logger.error("Error fetching supplier: %s", e)
return None
@task
def upsert_supplier_node(supplier_data: dict) -> str | None:
"""Upsert supplier node to ArangoDB."""
logger = get_run_logger()
lat = supplier_data.get("latitude")
lng = supplier_data.get("longitude")
if lat is None or lng is None:
logger.warning("Supplier %s has no coordinates, skipping", supplier_data.get("uuid"))
return None
db = get_db()
if not db.has_collection("nodes"):
db.create_collection("nodes")
nodes = db.collection("nodes")
supplier_uuid = supplier_data["uuid"]
doc = {
"_key": supplier_uuid,
"name": supplier_data.get("name") or f"Supplier {supplier_uuid}",
"latitude": lat,
"longitude": lng,
"country": supplier_data.get("country"),
"country_code": supplier_data.get("country_code") or supplier_data.get("countryCode"),
"transport_types": [],
"node_type": "supplier",
"team_uuid": supplier_data.get("team_uuid") or supplier_data.get("teamUuid"),
"is_verified": supplier_data.get("is_verified", supplier_data.get("isVerified", False)),
"synced_at": datetime.utcnow().isoformat(),
}
if nodes.has(supplier_uuid):
nodes.update(doc)
logger.info("Updated supplier node %s", supplier_uuid)
else:
nodes.insert(doc)
logger.info("Inserted supplier node %s", supplier_uuid)
return supplier_uuid
@task
def upsert_offer_edge(offer_uuid: str, location_uuid: str) -> bool:
logger = get_run_logger()
db = get_db()
if not db.has_collection("edges"):
db.create_collection("edges", edge=True)
edges = db.collection("edges")
edge_key = f"{offer_uuid}_{location_uuid}_offer"
doc = {
"_key": edge_key,
"_from": f"nodes/{offer_uuid}",
"_to": f"nodes/{location_uuid}",
"type": "offer",
"transport_type": "offer",
"distance_km": 0,
"travel_time_seconds": 0,
"source": "offer",
"priority": 1,
"calculated_at": datetime.utcnow().isoformat(),
}
if edges.has(edge_key):
edges.update(doc)
logger.info("Updated offer edge %s -> %s", offer_uuid, location_uuid)
else:
edges.insert(doc)
logger.info("Created offer edge %s -> %s", offer_uuid, location_uuid)
return True
@flow(name="sync-offer", log_prints=True)
def sync_offer(offer_data: dict):
"""Sync offer to graph database and signal Temporal workflow."""
from prefect.context import get_run_context
logger = get_run_logger()
offer_uuid = offer_data.get("uuid")
location_uuid = offer_data.get("location_uuid")
workflow_id = offer_data.get("workflow_id")
if not offer_uuid or not location_uuid:
raise ValueError("offer_data must include uuid and location_uuid")
logger.info("Syncing offer %s to graph", offer_uuid)
success = True
message = ""
try:
upsert_offer_node(offer_data)
upsert_offer_edge(offer_uuid, location_uuid)
# Sync supplier node if supplier_uuid provided
supplier_uuid = offer_data.get("supplier_uuid")
team_uuid = offer_data.get("team_uuid")
supplier_data = fetch_supplier_from_exchange(supplier_uuid, team_uuid)
if supplier_data:
upsert_supplier_node(supplier_data)
logger.info("Offer %s synced", offer_uuid)
except Exception as e:
success = False
message = str(e)
logger.error("Failed to sync offer %s: %s", offer_uuid, e)
# Signal Temporal workflow if workflow_id provided
if workflow_id:
ctx = get_run_context()
flow_run_id = str(ctx.flow_run.id) if ctx.flow_run else ""
signal_temporal_completion(workflow_id, success, flow_run_id, message)
if not success:
raise RuntimeError(f"Offer sync failed: {message}")
return {"offer_uuid": offer_uuid, "status": "ok"}