Files
geo/geo_app/schema.py
Ruslan Bakiev 07f89ba5fb
All checks were successful
Build Docker Image / build (push) Successful in 1m24s
refactor(geo): Clean up queries - rename offers_to_hub to offers_by_hub, add offer_to_hub
- Remove find_routes, find_product_routes, delivery_to_hub queries
- Rename offers_to_hub → offers_by_hub with proper phase-based routing (auto → rail* → auto)
- Add offer_to_hub query for single offer to hub connection
- Both new queries use Dijkstra-like search with transport phases
2026-01-16 16:54:00 +07:00

1310 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""GraphQL schema for Geo service."""
import logging
import heapq
import math
import requests
import graphene
from django.conf import settings
from .arango_client import get_db, ensure_graph
from .cluster_index import get_clustered_nodes, invalidate_cache
logger = logging.getLogger(__name__)
class EdgeType(graphene.ObjectType):
"""Edge between two nodes (route)."""
to_uuid = graphene.String()
to_name = graphene.String()
to_latitude = graphene.Float()
to_longitude = graphene.Float()
distance_km = graphene.Float()
travel_time_seconds = graphene.Int()
transport_type = graphene.String()
class NodeType(graphene.ObjectType):
"""Logistics node with edges to neighbors."""
uuid = graphene.String()
name = graphene.String()
latitude = graphene.Float()
longitude = graphene.Float()
country = graphene.String()
country_code = graphene.String()
synced_at = graphene.String()
transport_types = graphene.List(graphene.String)
edges = graphene.List(EdgeType)
class NodeConnectionsType(graphene.ObjectType):
"""Auto + rail edges for a node, rail uses nearest rail node."""
hub = graphene.Field(NodeType)
rail_node = graphene.Field(NodeType)
auto_edges = graphene.List(EdgeType)
rail_edges = graphene.List(EdgeType)
class RouteType(graphene.ObjectType):
"""Route between two points with geometry."""
distance_km = graphene.Float()
geometry = graphene.JSONString(description="GeoJSON LineString coordinates")
class RouteStageType(graphene.ObjectType):
"""Single stage in a multi-hop route."""
from_uuid = graphene.String()
from_name = graphene.String()
from_lat = graphene.Float()
from_lon = graphene.Float()
to_uuid = graphene.String()
to_name = graphene.String()
to_lat = graphene.Float()
to_lon = graphene.Float()
distance_km = graphene.Float()
travel_time_seconds = graphene.Int()
transport_type = graphene.String()
class RoutePathType(graphene.ObjectType):
"""Complete route through graph with multiple stages."""
total_distance_km = graphene.Float()
total_time_seconds = graphene.Int()
stages = graphene.List(RouteStageType)
class ProductRouteOptionType(graphene.ObjectType):
"""Route options for a product source to the destination."""
source_uuid = graphene.String()
source_name = graphene.String()
source_lat = graphene.Float()
source_lon = graphene.Float()
distance_km = graphene.Float()
routes = graphene.List(RoutePathType)
class ClusterPointType(graphene.ObjectType):
"""Cluster or individual point for map display."""
id = graphene.String(description="UUID for points, 'cluster-N' for clusters")
latitude = graphene.Float()
longitude = graphene.Float()
count = graphene.Int(description="1 for single point, >1 for cluster")
expansion_zoom = graphene.Int(description="Zoom level to expand cluster")
name = graphene.String(description="Node name (only for single points)")
class ProductType(graphene.ObjectType):
"""Unique product from offers."""
uuid = graphene.String()
name = graphene.String()
class SupplierType(graphene.ObjectType):
"""Unique supplier from offers."""
uuid = graphene.String()
class OfferNodeType(graphene.ObjectType):
"""Offer node with location and product info."""
uuid = graphene.String()
product_uuid = graphene.String()
product_name = graphene.String()
supplier_uuid = graphene.String()
latitude = graphene.Float()
longitude = graphene.Float()
country = graphene.String()
country_code = graphene.String()
price_per_unit = graphene.String()
currency = graphene.String()
quantity = graphene.String()
unit = graphene.String()
class Query(graphene.ObjectType):
"""Root query."""
MAX_EXPANSIONS = 20000
node = graphene.Field(
NodeType,
uuid=graphene.String(required=True),
description="Get node by UUID with all edges to neighbors",
)
nodes = graphene.List(
NodeType,
description="Get all nodes (without edges for performance)",
limit=graphene.Int(),
offset=graphene.Int(),
transport_type=graphene.String(),
country=graphene.String(description="Filter by country name"),
search=graphene.String(description="Search by node name (case-insensitive)"),
)
nodes_count = graphene.Int(
transport_type=graphene.String(),
country=graphene.String(description="Filter by country name"),
description="Get total count of nodes (with optional transport/country filter)",
)
hub_countries = graphene.List(
graphene.String,
description="List of countries that have logistics hubs",
)
nearest_nodes = graphene.List(
NodeType,
lat=graphene.Float(required=True, description="Latitude"),
lon=graphene.Float(required=True, description="Longitude"),
limit=graphene.Int(default_value=5, description="Max results"),
description="Find nearest logistics nodes to given coordinates",
)
node_connections = graphene.Field(
NodeConnectionsType,
uuid=graphene.String(required=True),
limit_auto=graphene.Int(default_value=12),
limit_rail=graphene.Int(default_value=12),
description="Get auto + rail edges for a node (rail uses nearest rail node)",
)
auto_route = graphene.Field(
RouteType,
from_lat=graphene.Float(required=True),
from_lon=graphene.Float(required=True),
to_lat=graphene.Float(required=True),
to_lon=graphene.Float(required=True),
description="Get auto route between two points via GraphHopper",
)
rail_route = graphene.Field(
RouteType,
from_lat=graphene.Float(required=True),
from_lon=graphene.Float(required=True),
to_lat=graphene.Float(required=True),
to_lon=graphene.Float(required=True),
description="Get rail route between two points via OpenRailRouting",
)
clustered_nodes = graphene.List(
ClusterPointType,
west=graphene.Float(required=True, description="Bounding box west longitude"),
south=graphene.Float(required=True, description="Bounding box south latitude"),
east=graphene.Float(required=True, description="Bounding box east longitude"),
north=graphene.Float(required=True, description="Bounding box north latitude"),
zoom=graphene.Int(required=True, description="Map zoom level (0-16)"),
transport_type=graphene.String(description="Filter by transport type"),
description="Get clustered nodes for map display (server-side clustering)",
)
# Catalog navigation queries
products = graphene.List(
ProductType,
description="Get unique products from all offers",
)
offers_by_product = graphene.List(
OfferNodeType,
product_uuid=graphene.String(required=True),
description="Get all offers for a product",
)
hubs_near_offer = graphene.List(
NodeType,
offer_uuid=graphene.String(required=True),
limit=graphene.Int(default_value=12),
description="Get nearest hubs to an offer location",
)
suppliers = graphene.List(
SupplierType,
description="Get unique suppliers from all offers",
)
products_by_supplier = graphene.List(
ProductType,
supplier_uuid=graphene.String(required=True),
description="Get products offered by a supplier",
)
offers_by_supplier_product = graphene.List(
OfferNodeType,
supplier_uuid=graphene.String(required=True),
product_uuid=graphene.String(required=True),
description="Get offers from a supplier for a specific product",
)
products_near_hub = graphene.List(
ProductType,
hub_uuid=graphene.String(required=True),
radius_km=graphene.Float(default_value=500),
description="Get products available near a hub",
)
offers_by_hub = graphene.List(
ProductRouteOptionType,
hub_uuid=graphene.String(required=True),
product_uuid=graphene.String(required=True),
limit=graphene.Int(default_value=10),
description="Get offers for a product with routes to hub (auto → rail* → auto)",
)
offer_to_hub = graphene.Field(
ProductRouteOptionType,
offer_uuid=graphene.String(required=True),
hub_uuid=graphene.String(required=True),
description="Get route from a specific offer to hub",
)
@staticmethod
def _build_routes(db, from_uuid, to_uuid, limit):
"""Shared helper to compute K shortest routes between two nodes."""
aql = """
FOR path IN ANY K_SHORTEST_PATHS
@from_vertex TO @to_vertex
GRAPH 'optovia_graph'
OPTIONS { weightAttribute: 'distance_km' }
LIMIT @limit
RETURN {
vertices: path.vertices,
edges: path.edges,
weight: path.weight
}
"""
try:
cursor = db.aql.execute(
aql,
bind_vars={
'from_vertex': f'nodes/{from_uuid}',
'to_vertex': f'nodes/{to_uuid}',
'limit': limit,
},
)
paths = list(cursor)
except Exception as e:
logger.error("K_SHORTEST_PATHS query failed: %s", e)
return []
if not paths:
logger.info("No paths found from %s to %s", from_uuid, to_uuid)
return []
routes = []
for path in paths:
vertices = path.get('vertices', [])
edges = path.get('edges', [])
# Build vertex lookup by _id
vertex_by_id = {v['_id']: v for v in vertices}
stages = []
for edge in edges:
from_node = vertex_by_id.get(edge['_from'], {})
to_node = vertex_by_id.get(edge['_to'], {})
stages.append(RouteStageType(
from_uuid=from_node.get('_key'),
from_name=from_node.get('name'),
from_lat=from_node.get('latitude'),
from_lon=from_node.get('longitude'),
to_uuid=to_node.get('_key'),
to_name=to_node.get('name'),
to_lat=to_node.get('latitude'),
to_lon=to_node.get('longitude'),
distance_km=edge.get('distance_km'),
travel_time_seconds=edge.get('travel_time_seconds'),
transport_type=edge.get('transport_type'),
))
total_time = sum(s.travel_time_seconds or 0 for s in stages)
routes.append(RoutePathType(
total_distance_km=path.get('weight'),
total_time_seconds=total_time,
stages=stages,
))
return routes
def resolve_node(self, info, uuid):
"""
Get a single node with all its outgoing edges.
Returns node info + list of edges to neighbors with distances.
"""
db = get_db()
# Get node
nodes_col = db.collection('nodes')
node = nodes_col.get(uuid)
if not node:
return None
# Get all outgoing edges from this node
edges_col = db.collection('edges')
aql = """
FOR edge IN edges
FILTER edge._from == @from_id
LET to_node = DOCUMENT(edge._to)
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
"""
cursor = db.aql.execute(aql, bind_vars={'from_id': f"nodes/{uuid}"})
edges = list(cursor)
logger.info("Node %s has %d edges", uuid, len(edges))
return NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[EdgeType(**e) for e in edges],
)
def resolve_nodes(self, info, limit=None, offset=None, transport_type=None, country=None, search=None):
"""Get all logistics nodes (without edges for list view)."""
db = get_db()
# Only return logistics nodes (not buyer/seller addresses)
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
FILTER @country == null OR node.country == @country
FILTER @search == null OR CONTAINS(LOWER(node.name), LOWER(@search)) OR CONTAINS(LOWER(node.country), LOWER(@search))
SORT node.name ASC
LIMIT @offset, @limit
RETURN node
"""
cursor = db.aql.execute(
aql,
bind_vars={
'transport_type': transport_type,
'country': country,
'search': search,
'offset': 0 if offset is None else offset,
'limit': 1000000 if limit is None else limit,
},
)
nodes = []
for node in cursor:
nodes.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[], # Don't load edges for list
))
logger.info("Returning %d nodes", len(nodes))
return nodes
def resolve_nodes_count(self, info, transport_type=None, country=None):
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
FILTER @country == null OR node.country == @country
COLLECT WITH COUNT INTO length
RETURN length
"""
cursor = db.aql.execute(aql, bind_vars={'transport_type': transport_type, 'country': country})
return next(cursor, 0)
def resolve_hub_countries(self, info):
"""Get unique country names from logistics hubs."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
FILTER node.country != null
COLLECT country = node.country
SORT country ASC
RETURN country
"""
cursor = db.aql.execute(aql)
return list(cursor)
def resolve_nearest_nodes(self, info, lat, lon, limit=5):
"""Find nearest logistics nodes to given coordinates."""
db = get_db()
# Get all logistics nodes and calculate distance
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
FILTER node.latitude != null AND node.longitude != null
LET dist = DISTANCE(node.latitude, node.longitude, @lat, @lon) / 1000
SORT dist ASC
LIMIT @limit
RETURN MERGE(node, {distance_km: dist})
"""
cursor = db.aql.execute(
aql,
bind_vars={'lat': lat, 'lon': lon, 'limit': limit},
)
nodes = []
for node in cursor:
nodes.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[],
))
return nodes
def resolve_node_connections(self, info, uuid, limit_auto=12, limit_rail=12):
"""Get auto edges from hub and rail edges from nearest rail node."""
db = get_db()
nodes_col = db.collection('nodes')
hub = nodes_col.get(uuid)
if not hub:
return None
aql = """
LET auto_edges = (
FOR edge IN edges
FILTER edge._from == @from_id AND edge.transport_type == "auto"
LET to_node = DOCUMENT(edge._to)
FILTER to_node != null
SORT edge.distance_km ASC
LIMIT @limit_auto
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
)
LET hub_has_rail = @hub_has_rail
LET rail_node = hub_has_rail ? DOCUMENT(@from_id) : FIRST(
FOR node IN nodes
FILTER node.latitude != null AND node.longitude != null
FILTER 'rail' IN node.transport_types
SORT DISTANCE(@hub_lat, @hub_lon, node.latitude, node.longitude)
LIMIT 1
RETURN node
)
LET rail_edges = rail_node == null ? [] : (
FOR edge IN edges
FILTER edge._from == CONCAT("nodes/", rail_node._key) AND edge.transport_type == "rail"
LET to_node = DOCUMENT(edge._to)
FILTER to_node != null
SORT edge.distance_km ASC
LIMIT @limit_rail
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
)
RETURN {
hub: DOCUMENT(@from_id),
rail_node: rail_node,
auto_edges: auto_edges,
rail_edges: rail_edges
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'from_id': f"nodes/{uuid}",
'hub_lat': hub.get('latitude'),
'hub_lon': hub.get('longitude'),
'hub_has_rail': 'rail' in (hub.get('transport_types') or []),
'limit_auto': limit_auto,
'limit_rail': limit_rail,
},
)
result = next(cursor, None)
if not result:
return None
def build_node(doc):
if not doc:
return None
return NodeType(
uuid=doc['_key'],
name=doc.get('name'),
latitude=doc.get('latitude'),
longitude=doc.get('longitude'),
country=doc.get('country'),
country_code=doc.get('country_code'),
synced_at=doc.get('synced_at'),
transport_types=doc.get('transport_types') or [],
edges=[],
)
return NodeConnectionsType(
hub=build_node(result.get('hub')),
rail_node=build_node(result.get('rail_node')),
auto_edges=[EdgeType(**e) for e in result.get('auto_edges') or []],
rail_edges=[EdgeType(**e) for e in result.get('rail_edges') or []],
)
def resolve_auto_route(self, info, from_lat, from_lon, to_lat, to_lon):
"""Get auto route via GraphHopper."""
url = f"{settings.GRAPHHOPPER_EXTERNAL_URL}/route"
params = {
'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"],
'profile': 'car',
'instructions': 'false',
'calc_points': 'true',
'points_encoded': 'false',
}
try:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if 'paths' in data and len(data['paths']) > 0:
path = data['paths'][0]
distance_km = round(path.get('distance', 0) / 1000, 2)
points = path.get('points', {})
coordinates = points.get('coordinates', [])
return RouteType(
distance_km=distance_km,
geometry=coordinates,
)
except requests.RequestException as e:
logger.error("GraphHopper request failed: %s", e)
return None
def resolve_rail_route(self, info, from_lat, from_lon, to_lat, to_lon):
"""Get rail route via OpenRailRouting."""
url = f"{settings.OPENRAILROUTING_EXTERNAL_URL}/route"
params = {
'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"],
'profile': 'all_tracks',
'calc_points': 'true',
'points_encoded': 'false',
}
try:
response = requests.get(url, params=params, timeout=60)
response.raise_for_status()
data = response.json()
if 'paths' in data and len(data['paths']) > 0:
path = data['paths'][0]
distance_km = round(path.get('distance', 0) / 1000, 2)
points = path.get('points', {})
coordinates = points.get('coordinates', [])
return RouteType(
distance_km=distance_km,
geometry=coordinates,
)
except requests.RequestException as e:
logger.error("OpenRailRouting request failed: %s", e)
return None
def resolve_clustered_nodes(self, info, west, south, east, north, zoom, transport_type=None):
"""Get clustered nodes for map display using server-side SuperCluster."""
db = get_db()
clusters = get_clustered_nodes(db, west, south, east, north, zoom, transport_type)
return [ClusterPointType(**c) for c in clusters]
def resolve_products(self, info):
"""Get unique products from all offers."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid != null
COLLECT product_uuid = node.product_uuid INTO offers
LET first_offer = FIRST(offers).node
RETURN {
uuid: product_uuid,
name: first_offer.product_name
}
"""
try:
cursor = db.aql.execute(aql)
products = [ProductType(uuid=p['uuid'], name=p.get('name')) for p in cursor]
logger.info("Found %d unique products", len(products))
return products
except Exception as e:
logger.error("Error getting products: %s", e)
return []
def resolve_offers_by_product(self, info, product_uuid):
"""Get all offers for a product."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid == @product_uuid
RETURN node
"""
try:
cursor = db.aql.execute(aql, bind_vars={'product_uuid': product_uuid})
offers = []
for node in cursor:
offers.append(OfferNodeType(
uuid=node['_key'],
product_uuid=node.get('product_uuid'),
product_name=node.get('product_name'),
supplier_uuid=node.get('supplier_uuid'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
price_per_unit=node.get('price_per_unit'),
currency=node.get('currency'),
quantity=node.get('quantity'),
unit=node.get('unit'),
))
logger.info("Found %d offers for product %s", len(offers), product_uuid)
return offers
except Exception as e:
logger.error("Error getting offers by product: %s", e)
return []
def resolve_hubs_near_offer(self, info, offer_uuid, limit=12):
"""Get nearest hubs to an offer location."""
db = get_db()
nodes_col = db.collection('nodes')
offer = nodes_col.get(offer_uuid)
if not offer:
logger.info("Offer %s not found", offer_uuid)
return []
lat = offer.get('latitude')
lon = offer.get('longitude')
if lat is None or lon is None:
logger.info("Offer %s has no coordinates", offer_uuid)
return []
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
FILTER node.product_uuid == null
FILTER node.latitude != null AND node.longitude != null
LET dist = DISTANCE(node.latitude, node.longitude, @lat, @lon) / 1000
SORT dist ASC
LIMIT @limit
RETURN MERGE(node, {distance_km: dist})
"""
try:
cursor = db.aql.execute(aql, bind_vars={'lat': lat, 'lon': lon, 'limit': limit})
hubs = []
for node in cursor:
hubs.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[],
))
logger.info("Found %d hubs near offer %s", len(hubs), offer_uuid)
return hubs
except Exception as e:
logger.error("Error getting hubs near offer: %s", e)
return []
def resolve_suppliers(self, info):
"""Get unique suppliers from all offers."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.supplier_uuid != null
COLLECT supplier_uuid = node.supplier_uuid
RETURN { uuid: supplier_uuid }
"""
try:
cursor = db.aql.execute(aql)
suppliers = [SupplierType(uuid=s['uuid']) for s in cursor]
logger.info("Found %d unique suppliers", len(suppliers))
return suppliers
except Exception as e:
logger.error("Error getting suppliers: %s", e)
return []
def resolve_products_by_supplier(self, info, supplier_uuid):
"""Get products offered by a supplier."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.supplier_uuid == @supplier_uuid
FILTER node.product_uuid != null
COLLECT product_uuid = node.product_uuid INTO offers
LET first_offer = FIRST(offers).node
RETURN {
uuid: product_uuid,
name: first_offer.product_name
}
"""
try:
cursor = db.aql.execute(aql, bind_vars={'supplier_uuid': supplier_uuid})
products = [ProductType(uuid=p['uuid'], name=p.get('name')) for p in cursor]
logger.info("Found %d products for supplier %s", len(products), supplier_uuid)
return products
except Exception as e:
logger.error("Error getting products by supplier: %s", e)
return []
def resolve_offers_by_supplier_product(self, info, supplier_uuid, product_uuid):
"""Get offers from a supplier for a specific product."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.supplier_uuid == @supplier_uuid
FILTER node.product_uuid == @product_uuid
RETURN node
"""
try:
cursor = db.aql.execute(aql, bind_vars={
'supplier_uuid': supplier_uuid,
'product_uuid': product_uuid
})
offers = []
for node in cursor:
offers.append(OfferNodeType(
uuid=node['_key'],
product_uuid=node.get('product_uuid'),
product_name=node.get('product_name'),
supplier_uuid=node.get('supplier_uuid'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
price_per_unit=node.get('price_per_unit'),
currency=node.get('currency'),
quantity=node.get('quantity'),
unit=node.get('unit'),
))
logger.info("Found %d offers for supplier %s product %s", len(offers), supplier_uuid, product_uuid)
return offers
except Exception as e:
logger.error("Error getting offers by supplier product: %s", e)
return []
def resolve_products_near_hub(self, info, hub_uuid, radius_km=500):
"""Get products available near a hub (within radius)."""
db = get_db()
nodes_col = db.collection('nodes')
hub = nodes_col.get(hub_uuid)
if not hub:
logger.info("Hub %s not found", hub_uuid)
return []
lat = hub.get('latitude')
lon = hub.get('longitude')
if lat is None or lon is None:
logger.info("Hub %s has no coordinates", hub_uuid)
return []
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid != null
FILTER node.latitude != null AND node.longitude != null
LET dist = DISTANCE(node.latitude, node.longitude, @lat, @lon) / 1000
FILTER dist <= @radius_km
COLLECT product_uuid = node.product_uuid INTO offers
LET first_offer = FIRST(offers).node
RETURN {
uuid: product_uuid,
name: first_offer.product_name
}
"""
try:
cursor = db.aql.execute(aql, bind_vars={'lat': lat, 'lon': lon, 'radius_km': radius_km})
products = [ProductType(uuid=p['uuid'], name=p.get('name')) for p in cursor]
logger.info("Found %d products near hub %s", len(products), hub_uuid)
return products
except Exception as e:
logger.error("Error getting products near hub: %s", e)
return []
def resolve_offers_by_hub(self, info, hub_uuid, product_uuid, limit=10):
"""
Get offers for a product with routes to hub.
Uses phase-based routing: auto → rail* → auto
Search goes from hub outward to find offers.
"""
db = get_db()
ensure_graph()
nodes_col = db.collection('nodes')
# Get hub
hub = nodes_col.get(hub_uuid)
if not hub:
logger.info("Hub %s not found", hub_uuid)
return []
hub_lat = hub.get('latitude')
hub_lon = hub.get('longitude')
if hub_lat is None or hub_lon is None:
logger.info("Hub %s missing coordinates", hub_uuid)
return []
# Phase-based routing: auto → rail* → auto
def allowed_next_phase(current_phase, transport_type):
"""
Phases — расширение радиуса поиска, ЖД не обязателен:
- end_auto: можно 1 авто, rail, или сразу offer
- end_auto_done: авто использовано — rail или offer
- rail: любое кол-во rail, потом 1 авто или offer
- start_auto_done: авто использовано — только offer
"""
if current_phase == 'end_auto':
if transport_type == 'offer':
return 'offer'
if transport_type == 'auto':
return 'end_auto_done'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'end_auto_done':
if transport_type == 'offer':
return 'offer'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'rail':
if transport_type == 'offer':
return 'offer'
if transport_type == 'rail':
return 'rail'
if transport_type == 'auto':
return 'start_auto_done'
return None
if current_phase == 'start_auto_done':
if transport_type == 'offer':
return 'offer'
return None
return None
def fetch_neighbors(node_key, phase):
"""Get neighbors based on allowed transport types for phase."""
if phase == 'end_auto':
types = ['auto', 'rail', 'offer']
elif phase == 'end_auto_done':
types = ['rail', 'offer']
elif phase == 'rail':
types = ['rail', 'auto', 'offer']
elif phase == 'start_auto_done':
types = ['offer']
else:
types = ['offer']
aql = """
FOR edge IN edges
FILTER edge.transport_type IN @types
FILTER edge._from == @node_id OR edge._to == @node_id
LET neighbor_id = edge._from == @node_id ? edge._to : edge._from
LET neighbor = DOCUMENT(neighbor_id)
FILTER neighbor != null
RETURN {
neighbor_key: neighbor._key,
neighbor_doc: neighbor,
from_id: edge._from,
to_id: edge._to,
transport_type: edge.transport_type,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'node_id': f"nodes/{node_key}",
'types': types,
},
)
return list(cursor)
# Priority queue: (cost, seq, node_key, phase)
queue = []
counter = 0
heapq.heappush(queue, (0, counter, hub_uuid, 'end_auto'))
visited = {}
predecessors = {}
node_docs = {hub_uuid: hub}
found_routes = []
expansions = 0
while queue and len(found_routes) < limit and expansions < Query.MAX_EXPANSIONS:
cost, _, node_key, phase = heapq.heappop(queue)
if (node_key, phase) in visited and cost > visited[(node_key, phase)]:
continue
# Found an offer for the product
node_doc = node_docs.get(node_key)
if node_doc and node_doc.get('product_uuid') == product_uuid:
path_edges = []
state = (node_key, phase)
current_key = node_key
while state in predecessors:
prev_state, edge_info = predecessors[state]
prev_key = prev_state[0]
path_edges.append((current_key, prev_key, edge_info))
state = prev_state
current_key = prev_key
route = _build_route_from_edges(path_edges, node_docs)
distance_km = None
src_lat = node_doc.get('latitude')
src_lon = node_doc.get('longitude')
if src_lat is not None and src_lon is not None:
distance_km = _distance_km(src_lat, src_lon, hub_lat, hub_lon)
found_routes.append(ProductRouteOptionType(
source_uuid=node_key,
source_name=node_doc.get('name') or node_doc.get('product_name'),
source_lat=node_doc.get('latitude'),
source_lon=node_doc.get('longitude'),
distance_km=distance_km,
routes=[route] if route else [],
))
continue
neighbors = fetch_neighbors(node_key, phase)
expansions += 1
for neighbor in neighbors:
transport_type = neighbor.get('transport_type')
next_phase = allowed_next_phase(phase, transport_type)
if next_phase is None:
continue
travel_time = neighbor.get('travel_time_seconds')
distance_km = neighbor.get('distance_km')
neighbor_key = neighbor.get('neighbor_key')
node_docs[neighbor_key] = neighbor.get('neighbor_doc')
step_cost = travel_time if travel_time is not None else (distance_km or 0)
new_cost = cost + step_cost
state_key = (neighbor_key, next_phase)
if state_key in visited and new_cost >= visited[state_key]:
continue
visited[state_key] = new_cost
counter += 1
heapq.heappush(queue, (new_cost, counter, neighbor_key, next_phase))
predecessors[state_key] = ((node_key, phase), neighbor)
if not found_routes:
logger.info("No offers found for product %s near hub %s", product_uuid, hub_uuid)
return []
logger.info("Found %d offers for product %s near hub %s", len(found_routes), product_uuid, hub_uuid)
return found_routes
def resolve_offer_to_hub(self, info, offer_uuid, hub_uuid):
"""
Get route from a specific offer to hub.
Uses phase-based routing: auto → rail* → auto
"""
db = get_db()
ensure_graph()
nodes_col = db.collection('nodes')
offer = nodes_col.get(offer_uuid)
if not offer:
logger.info("Offer %s not found", offer_uuid)
return None
hub = nodes_col.get(hub_uuid)
if not hub:
logger.info("Hub %s not found", hub_uuid)
return None
hub_lat = hub.get('latitude')
hub_lon = hub.get('longitude')
offer_lat = offer.get('latitude')
offer_lon = offer.get('longitude')
# Phase-based routing from hub to offer
def allowed_next_phase(current_phase, transport_type):
if current_phase == 'end_auto':
if transport_type == 'offer':
return 'offer'
if transport_type == 'auto':
return 'end_auto_done'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'end_auto_done':
if transport_type == 'offer':
return 'offer'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'rail':
if transport_type == 'offer':
return 'offer'
if transport_type == 'rail':
return 'rail'
if transport_type == 'auto':
return 'start_auto_done'
return None
if current_phase == 'start_auto_done':
if transport_type == 'offer':
return 'offer'
return None
return None
def fetch_neighbors(node_key, phase):
if phase == 'end_auto':
types = ['auto', 'rail', 'offer']
elif phase == 'end_auto_done':
types = ['rail', 'offer']
elif phase == 'rail':
types = ['rail', 'auto', 'offer']
elif phase == 'start_auto_done':
types = ['offer']
else:
types = ['offer']
aql = """
FOR edge IN edges
FILTER edge.transport_type IN @types
FILTER edge._from == @node_id OR edge._to == @node_id
LET neighbor_id = edge._from == @node_id ? edge._to : edge._from
LET neighbor = DOCUMENT(neighbor_id)
FILTER neighbor != null
RETURN {
neighbor_key: neighbor._key,
neighbor_doc: neighbor,
from_id: edge._from,
to_id: edge._to,
transport_type: edge.transport_type,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'node_id': f"nodes/{node_key}",
'types': types,
},
)
return list(cursor)
queue = []
counter = 0
heapq.heappush(queue, (0, counter, hub_uuid, 'end_auto'))
visited = {}
predecessors = {}
node_docs = {hub_uuid: hub, offer_uuid: offer}
expansions = 0
while queue and expansions < Query.MAX_EXPANSIONS:
cost, _, node_key, phase = heapq.heappop(queue)
if (node_key, phase) in visited and cost > visited[(node_key, phase)]:
continue
# Found the specific offer
if node_key == offer_uuid:
path_edges = []
state = (node_key, phase)
current_key = node_key
while state in predecessors:
prev_state, edge_info = predecessors[state]
prev_key = prev_state[0]
path_edges.append((current_key, prev_key, edge_info))
state = prev_state
current_key = prev_key
route = _build_route_from_edges(path_edges, node_docs)
distance_km = None
if offer_lat is not None and offer_lon is not None and hub_lat is not None and hub_lon is not None:
distance_km = _distance_km(offer_lat, offer_lon, hub_lat, hub_lon)
return ProductRouteOptionType(
source_uuid=offer_uuid,
source_name=offer.get('name') or offer.get('product_name'),
source_lat=offer_lat,
source_lon=offer_lon,
distance_km=distance_km,
routes=[route] if route else [],
)
neighbors = fetch_neighbors(node_key, phase)
expansions += 1
for neighbor in neighbors:
transport_type = neighbor.get('transport_type')
next_phase = allowed_next_phase(phase, transport_type)
if next_phase is None:
continue
travel_time = neighbor.get('travel_time_seconds')
distance_km = neighbor.get('distance_km')
neighbor_key = neighbor.get('neighbor_key')
node_docs[neighbor_key] = neighbor.get('neighbor_doc')
step_cost = travel_time if travel_time is not None else (distance_km or 0)
new_cost = cost + step_cost
state_key = (neighbor_key, next_phase)
if state_key in visited and new_cost >= visited[state_key]:
continue
visited[state_key] = new_cost
counter += 1
heapq.heappush(queue, (new_cost, counter, neighbor_key, next_phase))
predecessors[state_key] = ((node_key, phase), neighbor)
logger.info("No route found from offer %s to hub %s", offer_uuid, hub_uuid)
return None
schema = graphene.Schema(query=Query)
# Helper methods attached to Query for route assembly
def _build_stage(from_doc, to_doc, transport_type, edges):
distance_km = sum(e.get('distance_km') or 0 for e in edges)
travel_time = sum(e.get('travel_time_seconds') or 0 for e in edges)
return RouteStageType(
from_uuid=from_doc.get('_key') if from_doc else None,
from_name=from_doc.get('name') if from_doc else None,
from_lat=from_doc.get('latitude') if from_doc else None,
from_lon=from_doc.get('longitude') if from_doc else None,
to_uuid=to_doc.get('_key') if to_doc else None,
to_name=to_doc.get('name') if to_doc else None,
to_lat=to_doc.get('latitude') if to_doc else None,
to_lon=to_doc.get('longitude') if to_doc else None,
distance_km=distance_km,
travel_time_seconds=travel_time,
transport_type=transport_type,
)
def _build_route_from_edges(path_edges, node_docs):
"""Собрать RoutePathType из списка ребёр (source->dest), схлопывая типы."""
if not path_edges:
return None
# Фильтруем offer edges - это не транспортные этапы, а связь оффера с локацией
path_edges = [(f, t, e) for f, t, e in path_edges if e.get('transport_type') != 'offer']
if not path_edges:
return None
stages = []
current_edges = []
current_type = None
segment_start = None
for from_key, to_key, edge in path_edges:
edge_type = edge.get('transport_type')
if current_type is None:
current_type = edge_type
current_edges = [edge]
segment_start = from_key
elif edge_type == current_type:
current_edges.append(edge)
else:
# закрываем предыдущий сегмент
stages.append(_build_stage(
node_docs.get(segment_start),
node_docs.get(from_key),
current_type,
current_edges,
))
current_type = edge_type
current_edges = [edge]
segment_start = from_key
# Последний сгусток
last_to = path_edges[-1][1]
stages.append(_build_stage(
node_docs.get(segment_start),
node_docs.get(last_to),
current_type,
current_edges,
))
total_distance = sum(s.distance_km or 0 for s in stages)
total_time = sum(s.travel_time_seconds or 0 for s in stages)
return RoutePathType(
total_distance_km=total_distance,
total_time_seconds=total_time,
stages=stages,
)
# Bind helpers to class for access in resolver
Query._build_route_from_edges = _build_route_from_edges
def _distance_km(lat1, lon1, lat2, lon2):
"""Haversine distance in km."""
r = 6371
d_lat = math.radians(lat2 - lat1)
d_lon = math.radians(lon2 - lon1)
a = (
math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(d_lon / 2) ** 2
)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return r * c
Query._distance_km = _distance_km