Files
geo/geo_app/schema.py
Ruslan Bakiev 3648366ebe
All checks were successful
Build Docker Image / build (push) Successful in 1m52s
feat(geo): graph-based hubs for product
2026-02-07 10:14:18 +07:00

2285 lines
88 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""GraphQL schema for Geo service."""
import logging
import heapq
import math
import requests
import graphene
from django.conf import settings
from .arango_client import get_db, ensure_graph
from .cluster_index import get_clustered_nodes
logger = logging.getLogger(__name__)
class EdgeType(graphene.ObjectType):
"""Edge between two nodes (route)."""
to_uuid = graphene.String()
to_name = graphene.String()
to_latitude = graphene.Float()
to_longitude = graphene.Float()
distance_km = graphene.Float()
travel_time_seconds = graphene.Int()
transport_type = graphene.String()
class NodeType(graphene.ObjectType):
"""Logistics node with edges to neighbors."""
uuid = graphene.String()
name = graphene.String()
latitude = graphene.Float()
longitude = graphene.Float()
country = graphene.String()
country_code = graphene.String()
synced_at = graphene.String()
transport_types = graphene.List(graphene.String)
edges = graphene.List(EdgeType)
distance_km = graphene.Float()
class NodeConnectionsType(graphene.ObjectType):
"""Auto + rail edges for a node, rail uses nearest rail node."""
hub = graphene.Field(NodeType)
rail_node = graphene.Field(NodeType)
auto_edges = graphene.List(EdgeType)
rail_edges = graphene.List(EdgeType)
class RouteType(graphene.ObjectType):
"""Route between two points with geometry."""
distance_km = graphene.Float()
geometry = graphene.JSONString(description="GeoJSON LineString coordinates")
class RouteStageType(graphene.ObjectType):
"""Single stage in a multi-hop route."""
from_uuid = graphene.String()
from_name = graphene.String()
from_lat = graphene.Float()
from_lon = graphene.Float()
to_uuid = graphene.String()
to_name = graphene.String()
to_lat = graphene.Float()
to_lon = graphene.Float()
distance_km = graphene.Float()
travel_time_seconds = graphene.Int()
transport_type = graphene.String()
class RoutePathType(graphene.ObjectType):
"""Complete route through graph with multiple stages."""
total_distance_km = graphene.Float()
total_time_seconds = graphene.Int()
stages = graphene.List(RouteStageType)
class ProductRouteOptionType(graphene.ObjectType):
"""Route options for a product source to the destination."""
source_uuid = graphene.String()
source_name = graphene.String()
source_lat = graphene.Float()
source_lon = graphene.Float()
distance_km = graphene.Float()
routes = graphene.List(RoutePathType)
class ClusterPointType(graphene.ObjectType):
"""Cluster or individual point for map display."""
id = graphene.String(description="UUID for points, 'cluster-N' for clusters")
latitude = graphene.Float()
longitude = graphene.Float()
count = graphene.Int(description="1 for single point, >1 for cluster")
expansion_zoom = graphene.Int(description="Zoom level to expand cluster")
name = graphene.String(description="Node name (only for single points)")
class ProductType(graphene.ObjectType):
"""Unique product from offers."""
uuid = graphene.String()
name = graphene.String()
offers_count = graphene.Int(description="Number of offers for this product")
def resolve_offers_count(self, info):
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid == @product_uuid
COLLECT WITH COUNT INTO length
RETURN length
"""
try:
cursor = db.aql.execute(aql, bind_vars={'product_uuid': self.uuid})
return next(cursor, 0)
except Exception as e:
logger.error("Error counting offers for product %s: %s", self.uuid, e)
return 0
class SupplierType(graphene.ObjectType):
"""Unique supplier from offers."""
uuid = graphene.String()
name = graphene.String()
latitude = graphene.Float()
longitude = graphene.Float()
distance_km = graphene.Float()
class OfferNodeType(graphene.ObjectType):
"""Offer node with location and product info."""
uuid = graphene.String()
product_uuid = graphene.String()
product_name = graphene.String()
supplier_uuid = graphene.String()
supplier_name = graphene.String()
latitude = graphene.Float()
longitude = graphene.Float()
country = graphene.String()
country_code = graphene.String()
price_per_unit = graphene.String()
currency = graphene.String()
quantity = graphene.String()
unit = graphene.String()
distance_km = graphene.Float()
class OfferWithRouteType(graphene.ObjectType):
"""Offer with route information to destination."""
uuid = graphene.String()
product_uuid = graphene.String()
product_name = graphene.String()
supplier_uuid = graphene.String()
supplier_name = graphene.String()
latitude = graphene.Float()
longitude = graphene.Float()
country = graphene.String()
country_code = graphene.String()
price_per_unit = graphene.String()
currency = graphene.String()
quantity = graphene.String()
unit = graphene.String()
distance_km = graphene.Float()
routes = graphene.List(lambda: RoutePathType)
class OfferCalculationType(graphene.ObjectType):
"""Calculation result that may include one or multiple offers."""
offers = graphene.List(lambda: OfferWithRouteType)
class Query(graphene.ObjectType):
"""Root query."""
MAX_EXPANSIONS = 20000
node = graphene.Field(
NodeType,
uuid=graphene.String(required=True),
description="Get node by UUID with all edges to neighbors",
)
nodes = graphene.List(
NodeType,
description="Get all nodes (without edges for performance)",
limit=graphene.Int(),
offset=graphene.Int(),
transport_type=graphene.String(),
country=graphene.String(description="Filter by country name"),
search=graphene.String(description="Search by node name (case-insensitive)"),
west=graphene.Float(description="Bounding box west longitude"),
south=graphene.Float(description="Bounding box south latitude"),
east=graphene.Float(description="Bounding box east longitude"),
north=graphene.Float(description="Bounding box north latitude"),
)
nodes_count = graphene.Int(
transport_type=graphene.String(),
country=graphene.String(description="Filter by country name"),
west=graphene.Float(description="Bounding box west longitude"),
south=graphene.Float(description="Bounding box south latitude"),
east=graphene.Float(description="Bounding box east longitude"),
north=graphene.Float(description="Bounding box north latitude"),
description="Get total count of nodes (with optional transport/country/bounds filter)",
)
hub_countries = graphene.List(
graphene.String,
description="List of countries that have logistics hubs",
)
nearest_nodes = graphene.List(
NodeType,
lat=graphene.Float(required=True, description="Latitude"),
lon=graphene.Float(required=True, description="Longitude"),
limit=graphene.Int(default_value=5, description="Max results"),
description="Find nearest logistics nodes to given coordinates",
)
node_connections = graphene.Field(
NodeConnectionsType,
uuid=graphene.String(required=True),
limit_auto=graphene.Int(default_value=12),
limit_rail=graphene.Int(default_value=12),
description="Get auto + rail edges for a node (rail uses nearest rail node)",
)
auto_route = graphene.Field(
RouteType,
from_lat=graphene.Float(required=True),
from_lon=graphene.Float(required=True),
to_lat=graphene.Float(required=True),
to_lon=graphene.Float(required=True),
description="Get auto route between two points via GraphHopper",
)
rail_route = graphene.Field(
RouteType,
from_lat=graphene.Float(required=True),
from_lon=graphene.Float(required=True),
to_lat=graphene.Float(required=True),
to_lon=graphene.Float(required=True),
description="Get rail route between two points via OpenRailRouting",
)
clustered_nodes = graphene.List(
ClusterPointType,
west=graphene.Float(required=True, description="Bounding box west longitude"),
south=graphene.Float(required=True, description="Bounding box south latitude"),
east=graphene.Float(required=True, description="Bounding box east longitude"),
north=graphene.Float(required=True, description="Bounding box north latitude"),
zoom=graphene.Int(required=True, description="Map zoom level (0-16)"),
transport_type=graphene.String(description="Filter by transport type"),
node_type=graphene.String(description="Node type: logistics, offer, supplier"),
product_uuid=graphene.String(description="Filter by product UUID"),
hub_uuid=graphene.String(description="Filter by hub UUID"),
supplier_uuid=graphene.String(description="Filter by supplier UUID"),
description="Get clustered nodes for map display (server-side clustering)",
)
# Catalog navigation queries
products = graphene.List(
ProductType,
description="Get unique products from all offers",
)
offers_by_product = graphene.List(
OfferNodeType,
product_uuid=graphene.String(required=True),
description="Get all offers for a product",
)
hubs_near_offer = graphene.List(
NodeType,
offer_uuid=graphene.String(required=True),
limit=graphene.Int(default_value=12),
description="Get nearest hubs to an offer location",
)
suppliers = graphene.List(
SupplierType,
description="Get unique suppliers from all offers",
)
products_by_supplier = graphene.List(
ProductType,
supplier_uuid=graphene.String(required=True),
description="Get products offered by a supplier",
)
offers_by_supplier_product = graphene.List(
OfferNodeType,
supplier_uuid=graphene.String(required=True),
product_uuid=graphene.String(required=True),
description="Get offers from a supplier for a specific product",
)
products_near_hub = graphene.List(
ProductType,
hub_uuid=graphene.String(required=True),
radius_km=graphene.Float(default_value=500),
description="Get products available near a hub",
)
suppliers_for_product = graphene.List(
SupplierType,
product_uuid=graphene.String(required=True),
description="Get suppliers that offer a specific product",
)
hubs_for_product = graphene.List(
NodeType,
product_uuid=graphene.String(required=True),
radius_km=graphene.Float(default_value=500),
description="Get hubs where a product is available nearby",
)
offers_by_hub = graphene.List(
ProductRouteOptionType,
hub_uuid=graphene.String(required=True),
product_uuid=graphene.String(required=True),
limit=graphene.Int(default_value=10),
description="Get offers for a product with routes to hub (auto → rail* → auto)",
)
offer_to_hub = graphene.Field(
ProductRouteOptionType,
offer_uuid=graphene.String(required=True),
hub_uuid=graphene.String(required=True),
description="Get route from a specific offer to hub",
)
# New unified endpoints (coordinate-based)
nearest_hubs = graphene.List(
NodeType,
lat=graphene.Float(required=True, description="Latitude"),
lon=graphene.Float(required=True, description="Longitude"),
radius=graphene.Float(default_value=1000, description="Search radius in km"),
product_uuid=graphene.String(description="Filter hubs by product availability"),
source_uuid=graphene.String(description="Source node UUID for graph-based nearest hubs"),
use_graph=graphene.Boolean(default_value=False, description="Use graph-based reachability for product filter"),
limit=graphene.Int(default_value=12, description="Max results"),
description="Find nearest hubs to coordinates (optionally filtered by product)",
)
nearest_offers = graphene.List(
OfferWithRouteType,
lat=graphene.Float(required=True, description="Latitude"),
lon=graphene.Float(required=True, description="Longitude"),
radius=graphene.Float(default_value=500, description="Search radius in km"),
product_uuid=graphene.String(description="Filter by product UUID"),
hub_uuid=graphene.String(description="Hub UUID - if provided, calculates routes to this hub"),
limit=graphene.Int(default_value=50, description="Max results"),
description="Find nearest offers to coordinates with optional routes to hub",
)
quote_calculations = graphene.List(
OfferCalculationType,
lat=graphene.Float(required=True, description="Latitude"),
lon=graphene.Float(required=True, description="Longitude"),
radius=graphene.Float(default_value=500, description="Search radius in km"),
product_uuid=graphene.String(description="Filter by product UUID"),
hub_uuid=graphene.String(description="Hub UUID - if provided, calculates routes to this hub"),
quantity=graphene.Float(description="Requested quantity to cover (optional)"),
limit=graphene.Int(default_value=10, description="Max calculations"),
description="Get quote calculations (single offer or split offers)",
)
nearest_suppliers = graphene.List(
SupplierType,
lat=graphene.Float(required=True, description="Latitude"),
lon=graphene.Float(required=True, description="Longitude"),
radius=graphene.Float(default_value=1000, description="Search radius in km"),
product_uuid=graphene.String(description="Filter by product UUID"),
limit=graphene.Int(default_value=12, description="Max results"),
description="Find nearest suppliers to coordinates (optionally filtered by product)",
)
route_to_coordinate = graphene.Field(
ProductRouteOptionType,
offer_uuid=graphene.String(required=True, description="Starting offer UUID"),
lat=graphene.Float(required=True, description="Destination latitude"),
lon=graphene.Float(required=True, description="Destination longitude"),
description="Get route from offer to target coordinates (finds nearest hub to coordinate)",
)
# New unified list endpoints
hubs_list = graphene.List(
NodeType,
limit=graphene.Int(default_value=50, description="Max results"),
offset=graphene.Int(default_value=0, description="Offset for pagination"),
country=graphene.String(description="Filter by country name"),
transport_type=graphene.String(description="Filter by transport type"),
west=graphene.Float(description="Bounding box west longitude"),
south=graphene.Float(description="Bounding box south latitude"),
east=graphene.Float(description="Bounding box east longitude"),
north=graphene.Float(description="Bounding box north latitude"),
description="Get paginated list of logistics hubs",
)
suppliers_list = graphene.List(
SupplierType,
limit=graphene.Int(default_value=50, description="Max results"),
offset=graphene.Int(default_value=0, description="Offset for pagination"),
country=graphene.String(description="Filter by country name"),
west=graphene.Float(description="Bounding box west longitude"),
south=graphene.Float(description="Bounding box south latitude"),
east=graphene.Float(description="Bounding box east longitude"),
north=graphene.Float(description="Bounding box north latitude"),
description="Get paginated list of suppliers from graph",
)
products_list = graphene.List(
ProductType,
limit=graphene.Int(default_value=50, description="Max results"),
offset=graphene.Int(default_value=0, description="Offset for pagination"),
west=graphene.Float(description="Bounding box west longitude"),
south=graphene.Float(description="Bounding box south latitude"),
east=graphene.Float(description="Bounding box east longitude"),
north=graphene.Float(description="Bounding box north latitude"),
description="Get paginated list of products from graph",
)
@staticmethod
def _build_routes(db, from_uuid, to_uuid, limit):
"""Shared helper to compute K shortest routes between two nodes."""
aql = """
FOR path IN ANY K_SHORTEST_PATHS
@from_vertex TO @to_vertex
GRAPH 'optovia_graph'
OPTIONS { weightAttribute: 'distance_km' }
LIMIT @limit
RETURN {
vertices: path.vertices,
edges: path.edges,
weight: path.weight
}
"""
try:
cursor = db.aql.execute(
aql,
bind_vars={
'from_vertex': f'nodes/{from_uuid}',
'to_vertex': f'nodes/{to_uuid}',
'limit': limit,
},
)
paths = list(cursor)
except Exception as e:
logger.error("K_SHORTEST_PATHS query failed: %s", e)
return []
if not paths:
logger.info("No paths found from %s to %s", from_uuid, to_uuid)
return []
routes = []
for path in paths:
vertices = path.get('vertices', [])
edges = path.get('edges', [])
# Build vertex lookup by _id
vertex_by_id = {v['_id']: v for v in vertices}
stages = []
for edge in edges:
from_node = vertex_by_id.get(edge['_from'], {})
to_node = vertex_by_id.get(edge['_to'], {})
stages.append(RouteStageType(
from_uuid=from_node.get('_key'),
from_name=from_node.get('name'),
from_lat=from_node.get('latitude'),
from_lon=from_node.get('longitude'),
to_uuid=to_node.get('_key'),
to_name=to_node.get('name'),
to_lat=to_node.get('latitude'),
to_lon=to_node.get('longitude'),
distance_km=edge.get('distance_km'),
travel_time_seconds=edge.get('travel_time_seconds'),
transport_type=edge.get('transport_type'),
))
total_time = sum(s.travel_time_seconds or 0 for s in stages)
routes.append(RoutePathType(
total_distance_km=path.get('weight'),
total_time_seconds=total_time,
stages=stages,
))
return routes
def resolve_node(self, info, uuid):
"""
Get a single node with all its outgoing edges.
Returns node info + list of edges to neighbors with distances.
"""
db = get_db()
# Get node
nodes_col = db.collection('nodes')
node = nodes_col.get(uuid)
if not node:
return None
# Get all outgoing edges from this node
edges_col = db.collection('edges')
aql = """
FOR edge IN edges
FILTER edge._from == @from_id
LET to_node = DOCUMENT(edge._to)
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
"""
cursor = db.aql.execute(aql, bind_vars={'from_id': f"nodes/{uuid}"})
edges = list(cursor)
logger.info("Node %s has %d edges", uuid, len(edges))
return NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[EdgeType(**e) for e in edges],
)
def resolve_nodes(self, info, limit=None, offset=None, transport_type=None, country=None, search=None,
west=None, south=None, east=None, north=None):
"""Get all logistics nodes (without edges for list view)."""
db = get_db()
# Build bounds filter if all bounds are provided
bounds_filter = ""
if west is not None and south is not None and east is not None and north is not None:
bounds_filter = """
FILTER node.latitude != null AND node.longitude != null
FILTER node.latitude >= @south AND node.latitude <= @north
FILTER node.longitude >= @west AND node.longitude <= @east
"""
# Only return logistics nodes (not buyer/seller addresses)
aql = f"""
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
FILTER @country == null OR node.country == @country
FILTER @search == null OR CONTAINS(LOWER(node.name), LOWER(@search)) OR CONTAINS(LOWER(node.country), LOWER(@search))
{bounds_filter}
SORT node.name ASC
LIMIT @offset, @limit
RETURN node
"""
bind_vars = {
'transport_type': transport_type,
'country': country,
'search': search,
'offset': 0 if offset is None else offset,
'limit': 1000000 if limit is None else limit,
}
# Only add bounds to bind_vars if they are used
if west is not None and south is not None and east is not None and north is not None:
bind_vars.update({
'west': west,
'south': south,
'east': east,
'north': north,
})
cursor = db.aql.execute(aql, bind_vars=bind_vars)
nodes = []
for node in cursor:
nodes.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[], # Don't load edges for list
))
logger.info("Returning %d nodes", len(nodes))
return nodes
def resolve_nodes_count(self, info, transport_type=None, country=None,
west=None, south=None, east=None, north=None):
db = get_db()
# Build bounds filter if all bounds are provided
bounds_filter = ""
if west is not None and south is not None and east is not None and north is not None:
bounds_filter = """
FILTER node.latitude != null AND node.longitude != null
FILTER node.latitude >= @south AND node.latitude <= @north
FILTER node.longitude >= @west AND node.longitude <= @east
"""
aql = f"""
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
FILTER @country == null OR node.country == @country
{bounds_filter}
COLLECT WITH COUNT INTO length
RETURN length
"""
bind_vars = {
'transport_type': transport_type,
'country': country,
}
# Only add bounds to bind_vars if they are used
if west is not None and south is not None and east is not None and north is not None:
bind_vars.update({
'west': west,
'south': south,
'east': east,
'north': north,
})
cursor = db.aql.execute(aql, bind_vars=bind_vars)
return next(cursor, 0)
def resolve_hub_countries(self, info):
"""Get unique country names from logistics hubs."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER ('rail' IN types) OR ('sea' IN types)
FILTER node.country != null
COLLECT country = node.country
SORT country ASC
RETURN country
"""
cursor = db.aql.execute(aql)
return list(cursor)
def resolve_nearest_nodes(self, info, lat, lon, limit=5):
"""Find nearest logistics nodes to given coordinates."""
db = get_db()
# Get all logistics nodes and calculate distance
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
FILTER node.latitude != null AND node.longitude != null
LET dist = DISTANCE(node.latitude, node.longitude, @lat, @lon) / 1000
SORT dist ASC
LIMIT @limit
RETURN MERGE(node, {distance_km: dist})
"""
cursor = db.aql.execute(
aql,
bind_vars={'lat': lat, 'lon': lon, 'limit': limit},
)
nodes = []
for node in cursor:
nodes.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[],
))
return nodes
def resolve_node_connections(self, info, uuid, limit_auto=12, limit_rail=12):
"""Get auto edges from hub and rail edges from nearest rail node."""
db = get_db()
nodes_col = db.collection('nodes')
hub = nodes_col.get(uuid)
if not hub:
return None
aql = """
LET auto_edges = (
FOR edge IN edges
FILTER edge._from == @from_id AND edge.transport_type == "auto"
LET to_node = DOCUMENT(edge._to)
FILTER to_node != null
SORT edge.distance_km ASC
LIMIT @limit_auto
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
)
LET hub_has_rail = @hub_has_rail
LET rail_node = hub_has_rail ? DOCUMENT(@from_id) : FIRST(
FOR node IN nodes
FILTER node.latitude != null AND node.longitude != null
FILTER 'rail' IN node.transport_types
SORT DISTANCE(@hub_lat, @hub_lon, node.latitude, node.longitude)
LIMIT 1
RETURN node
)
LET rail_edges = rail_node == null ? [] : (
FOR edge IN edges
FILTER edge._from == CONCAT("nodes/", rail_node._key) AND edge.transport_type == "rail"
LET to_node = DOCUMENT(edge._to)
FILTER to_node != null
SORT edge.distance_km ASC
LIMIT @limit_rail
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
)
RETURN {
hub: DOCUMENT(@from_id),
rail_node: rail_node,
auto_edges: auto_edges,
rail_edges: rail_edges
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'from_id': f"nodes/{uuid}",
'hub_lat': hub.get('latitude'),
'hub_lon': hub.get('longitude'),
'hub_has_rail': 'rail' in (hub.get('transport_types') or []),
'limit_auto': limit_auto,
'limit_rail': limit_rail,
},
)
result = next(cursor, None)
if not result:
return None
def build_node(doc):
if not doc:
return None
return NodeType(
uuid=doc['_key'],
name=doc.get('name'),
latitude=doc.get('latitude'),
longitude=doc.get('longitude'),
country=doc.get('country'),
country_code=doc.get('country_code'),
synced_at=doc.get('synced_at'),
transport_types=doc.get('transport_types') or [],
edges=[],
)
return NodeConnectionsType(
hub=build_node(result.get('hub')),
rail_node=build_node(result.get('rail_node')),
auto_edges=[EdgeType(**e) for e in result.get('auto_edges') or []],
rail_edges=[EdgeType(**e) for e in result.get('rail_edges') or []],
)
def resolve_auto_route(self, info, from_lat, from_lon, to_lat, to_lon):
"""Get auto route via GraphHopper."""
url = f"{settings.GRAPHHOPPER_EXTERNAL_URL}/route"
params = {
'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"],
'profile': 'car',
'instructions': 'false',
'calc_points': 'true',
'points_encoded': 'false',
}
try:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if 'paths' in data and len(data['paths']) > 0:
path = data['paths'][0]
distance_km = round(path.get('distance', 0) / 1000, 2)
points = path.get('points', {})
coordinates = points.get('coordinates', [])
return RouteType(
distance_km=distance_km,
geometry=coordinates,
)
except requests.RequestException as e:
logger.error("GraphHopper request failed: %s", e)
return None
def resolve_rail_route(self, info, from_lat, from_lon, to_lat, to_lon):
"""Get rail route via OpenRailRouting."""
url = f"{settings.OPENRAILROUTING_EXTERNAL_URL}/route"
params = {
'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"],
'profile': 'all_tracks',
'calc_points': 'true',
'points_encoded': 'false',
}
try:
response = requests.get(url, params=params, timeout=60)
response.raise_for_status()
data = response.json()
if 'paths' in data and len(data['paths']) > 0:
path = data['paths'][0]
distance_km = round(path.get('distance', 0) / 1000, 2)
points = path.get('points', {})
coordinates = points.get('coordinates', [])
return RouteType(
distance_km=distance_km,
geometry=coordinates,
)
except requests.RequestException as e:
logger.error("OpenRailRouting request failed: %s", e)
return None
def resolve_clustered_nodes(
self,
info,
west,
south,
east,
north,
zoom,
transport_type=None,
node_type=None,
product_uuid=None,
hub_uuid=None,
supplier_uuid=None,
):
"""Get clustered nodes for map display using server-side SuperCluster."""
db = get_db()
clusters = get_clustered_nodes(
db,
west,
south,
east,
north,
zoom,
transport_type,
node_type,
product_uuid,
hub_uuid,
supplier_uuid,
)
return [ClusterPointType(**c) for c in clusters]
def resolve_products(self, info):
"""Get unique products from all offers."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid != null
COLLECT product_uuid = node.product_uuid INTO offers
LET first_offer = FIRST(offers).node
RETURN {
uuid: product_uuid,
name: first_offer.product_name
}
"""
try:
cursor = db.aql.execute(aql)
products = [ProductType(uuid=p['uuid'], name=p.get('name')) for p in cursor]
logger.info("Found %d unique products", len(products))
return products
except Exception as e:
logger.error("Error getting products: %s", e)
return []
def resolve_offers_by_product(self, info, product_uuid):
"""Get all offers for a product."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid == @product_uuid
RETURN node
"""
try:
cursor = db.aql.execute(aql, bind_vars={'product_uuid': product_uuid})
offers = []
for node in cursor:
offers.append(OfferNodeType(
uuid=node['_key'],
product_uuid=node.get('product_uuid'),
product_name=node.get('product_name'),
supplier_uuid=node.get('supplier_uuid'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
price_per_unit=node.get('price_per_unit'),
currency=node.get('currency'),
quantity=node.get('quantity'),
unit=node.get('unit'),
))
logger.info("Found %d offers for product %s", len(offers), product_uuid)
return offers
except Exception as e:
logger.error("Error getting offers by product: %s", e)
return []
def resolve_hubs_near_offer(self, info, offer_uuid, limit=12):
"""Get nearest hubs to an offer location."""
db = get_db()
nodes_col = db.collection('nodes')
offer = nodes_col.get(offer_uuid)
if not offer:
logger.info("Offer %s not found", offer_uuid)
return []
lat = offer.get('latitude')
lon = offer.get('longitude')
if lat is None or lon is None:
logger.info("Offer %s has no coordinates", offer_uuid)
return []
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
FILTER node.product_uuid == null
FILTER node.latitude != null AND node.longitude != null
LET dist = DISTANCE(node.latitude, node.longitude, @lat, @lon) / 1000
SORT dist ASC
LIMIT @limit
RETURN MERGE(node, {distance_km: dist})
"""
try:
cursor = db.aql.execute(aql, bind_vars={'lat': lat, 'lon': lon, 'limit': limit})
hubs = []
for node in cursor:
hubs.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[],
))
logger.info("Found %d hubs near offer %s", len(hubs), offer_uuid)
return hubs
except Exception as e:
logger.error("Error getting hubs near offer: %s", e)
return []
def resolve_suppliers(self, info):
"""Get unique suppliers from all offers."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.supplier_uuid != null
COLLECT supplier_uuid = node.supplier_uuid
RETURN { uuid: supplier_uuid }
"""
try:
cursor = db.aql.execute(aql)
suppliers = [SupplierType(uuid=s['uuid']) for s in cursor]
logger.info("Found %d unique suppliers", len(suppliers))
return suppliers
except Exception as e:
logger.error("Error getting suppliers: %s", e)
return []
def resolve_products_by_supplier(self, info, supplier_uuid):
"""Get products offered by a supplier."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.supplier_uuid == @supplier_uuid
FILTER node.product_uuid != null
COLLECT product_uuid = node.product_uuid INTO offers
LET first_offer = FIRST(offers).node
RETURN {
uuid: product_uuid,
name: first_offer.product_name
}
"""
try:
cursor = db.aql.execute(aql, bind_vars={'supplier_uuid': supplier_uuid})
products = [ProductType(uuid=p['uuid'], name=p.get('name')) for p in cursor]
logger.info("Found %d products for supplier %s", len(products), supplier_uuid)
return products
except Exception as e:
logger.error("Error getting products by supplier: %s", e)
return []
def resolve_offers_by_supplier_product(self, info, supplier_uuid, product_uuid):
"""Get offers from a supplier for a specific product."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.supplier_uuid == @supplier_uuid
FILTER node.product_uuid == @product_uuid
RETURN node
"""
try:
cursor = db.aql.execute(aql, bind_vars={
'supplier_uuid': supplier_uuid,
'product_uuid': product_uuid
})
offers = []
for node in cursor:
offers.append(OfferNodeType(
uuid=node['_key'],
product_uuid=node.get('product_uuid'),
product_name=node.get('product_name'),
supplier_uuid=node.get('supplier_uuid'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
price_per_unit=node.get('price_per_unit'),
currency=node.get('currency'),
quantity=node.get('quantity'),
unit=node.get('unit'),
))
logger.info("Found %d offers for supplier %s product %s", len(offers), supplier_uuid, product_uuid)
return offers
except Exception as e:
logger.error("Error getting offers by supplier product: %s", e)
return []
def resolve_products_near_hub(self, info, hub_uuid, radius_km=500):
"""Get products available near a hub (within radius)."""
db = get_db()
nodes_col = db.collection('nodes')
hub = nodes_col.get(hub_uuid)
if not hub:
logger.info("Hub %s not found", hub_uuid)
return []
lat = hub.get('latitude')
lon = hub.get('longitude')
if lat is None or lon is None:
logger.info("Hub %s has no coordinates", hub_uuid)
return []
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid != null
FILTER node.latitude != null AND node.longitude != null
LET dist = DISTANCE(node.latitude, node.longitude, @lat, @lon) / 1000
FILTER dist <= @radius_km
COLLECT product_uuid = node.product_uuid INTO offers
LET first_offer = FIRST(offers).node
RETURN {
uuid: product_uuid,
name: first_offer.product_name
}
"""
try:
cursor = db.aql.execute(aql, bind_vars={'lat': lat, 'lon': lon, 'radius_km': radius_km})
products = [ProductType(uuid=p['uuid'], name=p.get('name')) for p in cursor]
logger.info("Found %d products near hub %s", len(products), hub_uuid)
return products
except Exception as e:
logger.error("Error getting products near hub: %s", e)
return []
def resolve_suppliers_for_product(self, info, product_uuid):
"""Get unique suppliers that have offers for this product."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid == @product_uuid
FILTER node.supplier_uuid != null
COLLECT supplier_uuid = node.supplier_uuid
RETURN { uuid: supplier_uuid }
"""
try:
cursor = db.aql.execute(aql, bind_vars={'product_uuid': product_uuid})
suppliers = [SupplierType(uuid=s['uuid']) for s in cursor]
logger.info("Found %d suppliers for product %s", len(suppliers), product_uuid)
return suppliers
except Exception as e:
logger.error("Error getting suppliers for product: %s", e)
return []
def resolve_hubs_for_product(self, info, product_uuid, radius_km=500):
"""Get hubs that have this product available within radius."""
db = get_db()
aql = """
FOR offer IN nodes
FILTER offer.node_type == 'offer'
FILTER offer.product_uuid == @product_uuid
FILTER offer.latitude != null AND offer.longitude != null
FOR hub IN nodes
FILTER hub.node_type == 'logistics' OR hub.node_type == null
FILTER hub.latitude != null AND hub.longitude != null
LET dist = DISTANCE(offer.latitude, offer.longitude, hub.latitude, hub.longitude) / 1000
FILTER dist <= @radius_km
COLLECT hub_uuid = hub._key, hub_name = hub.name,
hub_lat = hub.latitude, hub_lon = hub.longitude,
hub_country = hub.country, hub_country_code = hub.country_code,
hub_transport = hub.transport_types
RETURN {
uuid: hub_uuid,
name: hub_name,
latitude: hub_lat,
longitude: hub_lon,
country: hub_country,
country_code: hub_country_code,
transport_types: hub_transport
}
"""
try:
cursor = db.aql.execute(aql, bind_vars={'product_uuid': product_uuid, 'radius_km': radius_km})
hubs = [NodeType(
uuid=h['uuid'],
name=h['name'],
latitude=h['latitude'],
longitude=h['longitude'],
country=h['country'],
country_code=h.get('country_code'),
transport_types=h.get('transport_types')
) for h in cursor]
logger.info("Found %d hubs for product %s", len(hubs), product_uuid)
return hubs
except Exception as e:
logger.error("Error getting hubs for product: %s", e)
return []
def resolve_hubs_for_product_graph(self, info, product_uuid, limit=500):
"""Get hubs that can be reached by graph routes for a product."""
db = get_db()
ensure_graph()
aql = """
FOR hub IN nodes
FILTER hub.node_type == 'logistics' OR hub.node_type == null
LET types = hub.transport_types != null ? hub.transport_types : []
FILTER ('rail' IN types) OR ('sea' IN types)
FILTER hub.latitude != null AND hub.longitude != null
RETURN hub
"""
try:
cursor = db.aql.execute(aql)
hubs_with_distance = []
for hub in cursor:
hub_uuid = hub.get('_key')
if not hub_uuid:
continue
try:
routes = self.resolve_offers_by_hub(info, hub_uuid, product_uuid, limit=1)
except Exception as route_error:
logger.error("Error resolving offers for hub %s: %s", hub_uuid, route_error)
continue
if not routes:
continue
distance_km = routes[0].distance_km if routes[0] else None
hubs_with_distance.append((distance_km, hub))
# Sort by graph distance when available
hubs_with_distance.sort(key=lambda item: (item[0] is None, item[0] or 0))
hubs = []
for distance_km, hub in hubs_with_distance[:limit]:
hubs.append(NodeType(
uuid=hub.get('_key'),
name=hub.get('name'),
latitude=hub.get('latitude'),
longitude=hub.get('longitude'),
country=hub.get('country'),
country_code=hub.get('country_code'),
transport_types=hub.get('transport_types'),
distance_km=distance_km,
))
logger.info("Found %d graph-reachable hubs for product %s", len(hubs), product_uuid)
return hubs
except Exception as e:
logger.error("Error getting graph hubs for product %s: %s", product_uuid, e)
return []
def resolve_offers_by_hub(self, info, hub_uuid, product_uuid, limit=10):
"""
Get offers for a product with routes to hub.
Uses phase-based routing: auto → rail* → auto
Search goes from hub outward to find offers.
"""
db = get_db()
ensure_graph()
nodes_col = db.collection('nodes')
# Get hub
hub = nodes_col.get(hub_uuid)
if not hub:
logger.info("Hub %s not found", hub_uuid)
return []
hub_lat = hub.get('latitude')
hub_lon = hub.get('longitude')
if hub_lat is None or hub_lon is None:
logger.info("Hub %s missing coordinates", hub_uuid)
return []
# Phase-based routing: auto → rail* → auto
def allowed_next_phase(current_phase, transport_type):
"""
Phases — расширение радиуса поиска, ЖД не обязателен:
- end_auto: можно 1 авто, rail, или сразу offer
- end_auto_done: авто использовано — rail или offer
- rail: любое кол-во rail, потом 1 авто или offer
- start_auto_done: авто использовано — только offer
"""
if current_phase == 'end_auto':
if transport_type == 'offer':
return 'offer'
if transport_type == 'auto':
return 'end_auto_done'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'end_auto_done':
if transport_type == 'offer':
return 'offer'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'rail':
if transport_type == 'offer':
return 'offer'
if transport_type == 'rail':
return 'rail'
if transport_type == 'auto':
return 'start_auto_done'
return None
if current_phase == 'start_auto_done':
if transport_type == 'offer':
return 'offer'
return None
return None
def fetch_neighbors(node_key, phase):
"""Get neighbors based on allowed transport types for phase."""
if phase == 'end_auto':
types = ['auto', 'rail', 'offer']
elif phase == 'end_auto_done':
types = ['rail', 'offer']
elif phase == 'rail':
types = ['rail', 'auto', 'offer']
elif phase == 'start_auto_done':
types = ['offer']
else:
types = ['offer']
aql = """
FOR edge IN edges
FILTER edge.transport_type IN @types
FILTER edge._from == @node_id OR edge._to == @node_id
LET neighbor_id = edge._from == @node_id ? edge._to : edge._from
LET neighbor = DOCUMENT(neighbor_id)
FILTER neighbor != null
RETURN {
neighbor_key: neighbor._key,
neighbor_doc: neighbor,
from_id: edge._from,
to_id: edge._to,
transport_type: edge.transport_type,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'node_id': f"nodes/{node_key}",
'types': types,
},
)
return list(cursor)
# Priority queue: (cost, seq, node_key, phase)
queue = []
counter = 0
heapq.heappush(queue, (0, counter, hub_uuid, 'end_auto'))
visited = {}
predecessors = {}
node_docs = {hub_uuid: hub}
found_routes = []
expansions = 0
while queue and len(found_routes) < limit and expansions < Query.MAX_EXPANSIONS:
cost, _, node_key, phase = heapq.heappop(queue)
if (node_key, phase) in visited and cost > visited[(node_key, phase)]:
continue
# Found an offer for the product
node_doc = node_docs.get(node_key)
if node_doc and node_doc.get('product_uuid') == product_uuid:
path_edges = []
state = (node_key, phase)
current_key = node_key
while state in predecessors:
prev_state, edge_info = predecessors[state]
prev_key = prev_state[0]
path_edges.append((current_key, prev_key, edge_info))
state = prev_state
current_key = prev_key
route = _build_route_from_edges(path_edges, node_docs)
distance_km = None
src_lat = node_doc.get('latitude')
src_lon = node_doc.get('longitude')
if src_lat is not None and src_lon is not None:
distance_km = _distance_km(src_lat, src_lon, hub_lat, hub_lon)
found_routes.append(ProductRouteOptionType(
source_uuid=node_key,
source_name=node_doc.get('name') or node_doc.get('product_name'),
source_lat=node_doc.get('latitude'),
source_lon=node_doc.get('longitude'),
distance_km=distance_km,
routes=[route] if route else [],
))
continue
neighbors = fetch_neighbors(node_key, phase)
expansions += 1
for neighbor in neighbors:
transport_type = neighbor.get('transport_type')
next_phase = allowed_next_phase(phase, transport_type)
if next_phase is None:
continue
travel_time = neighbor.get('travel_time_seconds')
distance_km = neighbor.get('distance_km')
neighbor_key = neighbor.get('neighbor_key')
node_docs[neighbor_key] = neighbor.get('neighbor_doc')
step_cost = travel_time if travel_time is not None else (distance_km or 0)
new_cost = cost + step_cost
state_key = (neighbor_key, next_phase)
if state_key in visited and new_cost >= visited[state_key]:
continue
visited[state_key] = new_cost
counter += 1
heapq.heappush(queue, (new_cost, counter, neighbor_key, next_phase))
predecessors[state_key] = ((node_key, phase), neighbor)
if not found_routes:
logger.info("No offers found for product %s near hub %s", product_uuid, hub_uuid)
return []
logger.info("Found %d offers for product %s near hub %s", len(found_routes), product_uuid, hub_uuid)
return found_routes
def resolve_offer_to_hub(self, info, offer_uuid, hub_uuid):
"""
Get route from a specific offer to hub.
Uses phase-based routing: auto → rail* → auto
"""
db = get_db()
ensure_graph()
nodes_col = db.collection('nodes')
offer = nodes_col.get(offer_uuid)
if not offer:
logger.info("Offer %s not found", offer_uuid)
return None
hub = nodes_col.get(hub_uuid)
if not hub:
logger.info("Hub %s not found", hub_uuid)
return None
hub_lat = hub.get('latitude')
hub_lon = hub.get('longitude')
offer_lat = offer.get('latitude')
offer_lon = offer.get('longitude')
# Phase-based routing from hub to offer
def allowed_next_phase(current_phase, transport_type):
if current_phase == 'end_auto':
if transport_type == 'offer':
return 'offer'
if transport_type == 'auto':
return 'end_auto_done'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'end_auto_done':
if transport_type == 'offer':
return 'offer'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'rail':
if transport_type == 'offer':
return 'offer'
if transport_type == 'rail':
return 'rail'
if transport_type == 'auto':
return 'start_auto_done'
return None
if current_phase == 'start_auto_done':
if transport_type == 'offer':
return 'offer'
return None
return None
def fetch_neighbors(node_key, phase):
if phase == 'end_auto':
types = ['auto', 'rail', 'offer']
elif phase == 'end_auto_done':
types = ['rail', 'offer']
elif phase == 'rail':
types = ['rail', 'auto', 'offer']
elif phase == 'start_auto_done':
types = ['offer']
else:
types = ['offer']
aql = """
FOR edge IN edges
FILTER edge.transport_type IN @types
FILTER edge._from == @node_id OR edge._to == @node_id
LET neighbor_id = edge._from == @node_id ? edge._to : edge._from
LET neighbor = DOCUMENT(neighbor_id)
FILTER neighbor != null
RETURN {
neighbor_key: neighbor._key,
neighbor_doc: neighbor,
from_id: edge._from,
to_id: edge._to,
transport_type: edge.transport_type,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'node_id': f"nodes/{node_key}",
'types': types,
},
)
return list(cursor)
queue = []
counter = 0
heapq.heappush(queue, (0, counter, hub_uuid, 'end_auto'))
visited = {}
predecessors = {}
node_docs = {hub_uuid: hub, offer_uuid: offer}
expansions = 0
while queue and expansions < Query.MAX_EXPANSIONS:
cost, _, node_key, phase = heapq.heappop(queue)
if (node_key, phase) in visited and cost > visited[(node_key, phase)]:
continue
# Found the specific offer
if node_key == offer_uuid:
path_edges = []
state = (node_key, phase)
current_key = node_key
while state in predecessors:
prev_state, edge_info = predecessors[state]
prev_key = prev_state[0]
path_edges.append((current_key, prev_key, edge_info))
state = prev_state
current_key = prev_key
route = _build_route_from_edges(path_edges, node_docs)
distance_km = None
if offer_lat is not None and offer_lon is not None and hub_lat is not None and hub_lon is not None:
distance_km = _distance_km(offer_lat, offer_lon, hub_lat, hub_lon)
return ProductRouteOptionType(
source_uuid=offer_uuid,
source_name=offer.get('name') or offer.get('product_name'),
source_lat=offer_lat,
source_lon=offer_lon,
distance_km=distance_km,
routes=[route] if route else [],
)
neighbors = fetch_neighbors(node_key, phase)
expansions += 1
for neighbor in neighbors:
transport_type = neighbor.get('transport_type')
next_phase = allowed_next_phase(phase, transport_type)
if next_phase is None:
continue
travel_time = neighbor.get('travel_time_seconds')
distance_km = neighbor.get('distance_km')
neighbor_key = neighbor.get('neighbor_key')
node_docs[neighbor_key] = neighbor.get('neighbor_doc')
step_cost = travel_time if travel_time is not None else (distance_km or 0)
new_cost = cost + step_cost
state_key = (neighbor_key, next_phase)
if state_key in visited and new_cost >= visited[state_key]:
continue
visited[state_key] = new_cost
counter += 1
heapq.heappush(queue, (new_cost, counter, neighbor_key, next_phase))
predecessors[state_key] = ((node_key, phase), neighbor)
logger.info("No route found from offer %s to hub %s", offer_uuid, hub_uuid)
return None
def resolve_nearest_hubs(self, info, lat, lon, radius=1000, product_uuid=None, source_uuid=None, use_graph=False, limit=12):
"""Find nearest hubs to coordinates, optionally filtered by product availability."""
db = get_db()
# Graph-based nearest hubs when source_uuid provided
if source_uuid:
nodes_col = db.collection('nodes')
start = nodes_col.get(source_uuid)
if not start:
logger.warning("Source node %s not found for nearest hubs, falling back to radius search", source_uuid)
source_uuid = None
else:
def is_target_hub(doc):
if doc.get('_key') == source_uuid:
return False
if doc.get('node_type') not in ('logistics', None):
return False
types = doc.get('transport_types') or []
return ('rail' in types) or ('sea' in types)
def fetch_neighbors(node_key):
aql = """
FOR edge IN edges
FILTER edge.transport_type IN ['auto', 'rail', 'offer']
FILTER edge._from == @node_id OR edge._to == @node_id
LET neighbor_id = edge._from == @node_id ? edge._to : edge._from
LET neighbor = DOCUMENT(neighbor_id)
FILTER neighbor != null
RETURN {
neighbor_key: neighbor._key,
neighbor_doc: neighbor,
transport_type: edge.transport_type,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds
}
"""
cursor = db.aql.execute(
aql,
bind_vars={'node_id': f"nodes/{node_key}"},
)
return list(cursor)
queue = []
counter = 0
heapq.heappush(queue, (0, counter, source_uuid))
visited = {}
node_docs = {source_uuid: start}
found = []
expansions = 0
while queue and len(found) < limit and expansions < Query.MAX_EXPANSIONS:
cost, _, node_key = heapq.heappop(queue)
if node_key in visited and cost > visited[node_key]:
continue
visited[node_key] = cost
node_doc = node_docs.get(node_key)
if node_doc and is_target_hub(node_doc):
found.append(node_doc)
if len(found) >= limit:
break
neighbors = fetch_neighbors(node_key)
expansions += 1
for neighbor in neighbors:
neighbor_key = neighbor.get('neighbor_key')
if not neighbor_key:
continue
node_docs[neighbor_key] = neighbor.get('neighbor_doc')
step_cost = neighbor.get('travel_time_seconds') or neighbor.get('distance_km') or 0
new_cost = cost + step_cost
if neighbor_key in visited and new_cost >= visited[neighbor_key]:
continue
counter += 1
heapq.heappush(queue, (new_cost, counter, neighbor_key))
hubs = []
for node in found:
hubs.append(NodeType(
uuid=node.get('_key'),
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[],
))
return hubs
if product_uuid and use_graph:
return self.resolve_hubs_for_product_graph(info, product_uuid, limit=limit)
if product_uuid:
# Find hubs that have offers for this product within radius
aql = """
FOR offer IN nodes
FILTER offer.node_type == 'offer'
FILTER offer.product_uuid == @product_uuid
FILTER offer.latitude != null AND offer.longitude != null
LET dist_to_offer = DISTANCE(offer.latitude, offer.longitude, @lat, @lon) / 1000
FILTER dist_to_offer <= @radius
FOR hub IN nodes
FILTER hub.node_type == 'logistics' OR hub.node_type == null
FILTER hub.product_uuid == null
LET types = hub.transport_types != null ? hub.transport_types : []
FILTER ('rail' IN types) OR ('sea' IN types)
FILTER hub.latitude != null AND hub.longitude != null
LET dist_to_hub = DISTANCE(hub.latitude, hub.longitude, @lat, @lon) / 1000
FILTER dist_to_hub <= @radius
COLLECT hub_uuid = hub._key INTO hub_group
LET first_hub = FIRST(hub_group)[0].hub
LET hub_dist = DISTANCE(first_hub.latitude, first_hub.longitude, @lat, @lon) / 1000
SORT hub_dist ASC
LIMIT @limit
RETURN MERGE(first_hub, {_key: hub_uuid, distance_km: hub_dist})
"""
bind_vars = {'lat': lat, 'lon': lon, 'radius': radius, 'product_uuid': product_uuid, 'limit': limit}
else:
# Simple nearest hubs search
aql = """
FOR hub IN nodes
FILTER hub.node_type == 'logistics' OR hub.node_type == null
FILTER hub.product_uuid == null
LET types = hub.transport_types != null ? hub.transport_types : []
FILTER ('rail' IN types) OR ('sea' IN types)
FILTER hub.latitude != null AND hub.longitude != null
LET dist = DISTANCE(hub.latitude, hub.longitude, @lat, @lon) / 1000
FILTER dist <= @radius
SORT dist ASC
LIMIT @limit
RETURN MERGE(hub, {distance_km: dist})
"""
bind_vars = {'lat': lat, 'lon': lon, 'radius': radius, 'limit': limit}
try:
cursor = db.aql.execute(aql, bind_vars=bind_vars)
hubs = []
for node in cursor:
hubs.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[],
distance_km=node.get('distance_km'),
))
logger.info("Found %d hubs near (%.3f, %.3f) within %d km", len(hubs), lat, lon, radius)
return hubs
except Exception as e:
logger.error("Error finding nearest hubs: %s", e)
return []
def resolve_nearest_offers(self, info, lat, lon, radius=500, product_uuid=None, hub_uuid=None, limit=50):
"""Find nearest offers to coordinates, optionally filtered by product. If hub_uuid provided, calculates routes."""
db = get_db()
ensure_graph()
# If hub_uuid + product_uuid provided, use graph search to return only offers with routes.
if hub_uuid and product_uuid:
try:
nodes_col = db.collection('nodes')
expanded_limit = max(limit * 5, limit)
route_options = Query.resolve_offers_by_hub(
Query, info, hub_uuid, product_uuid, expanded_limit
)
offers = []
for option in route_options or []:
if not option.routes:
continue
node = nodes_col.get(option.source_uuid)
if not node:
continue
offers.append(OfferWithRouteType(
uuid=node['_key'],
product_uuid=node.get('product_uuid'),
product_name=node.get('product_name'),
supplier_uuid=node.get('supplier_uuid'),
supplier_name=node.get('supplier_name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
price_per_unit=node.get('price_per_unit'),
currency=node.get('currency'),
quantity=node.get('quantity'),
unit=node.get('unit'),
distance_km=option.distance_km,
routes=option.routes,
))
if len(offers) >= limit:
break
logger.info("Found %d offers by graph for hub %s", len(offers), hub_uuid)
return offers
except Exception as e:
logger.error("Error finding offers by hub %s: %s", hub_uuid, e)
return []
aql = """
FOR offer IN nodes
FILTER offer.node_type == 'offer'
FILTER offer.product_uuid != null
FILTER offer.latitude != null AND offer.longitude != null
"""
if product_uuid:
aql += " FILTER offer.product_uuid == @product_uuid\n"
aql += """
LET dist = DISTANCE(offer.latitude, offer.longitude, @lat, @lon) / 1000
FILTER dist <= @radius
SORT dist ASC
LIMIT @limit
RETURN MERGE(offer, {distance_km: dist})
"""
bind_vars = {'lat': lat, 'lon': lon, 'radius': radius, 'limit': limit}
if product_uuid:
bind_vars['product_uuid'] = product_uuid
try:
cursor = db.aql.execute(aql, bind_vars=bind_vars)
offer_nodes = list(cursor)
logger.info("Found %d offers near (%.3f, %.3f) within %d km", len(offer_nodes), lat, lon, radius)
# If hub_uuid provided, calculate routes for each offer
if hub_uuid:
nodes_col = db.collection('nodes')
hub = nodes_col.get(hub_uuid)
if not hub:
logger.warning("Hub %s not found for route calculation", hub_uuid)
hub_uuid = None
offers = []
for node in offer_nodes:
routes = []
# Calculate route to hub if hub_uuid provided
if hub_uuid:
route_result = Query.resolve_offer_to_hub(Query, info, node['_key'], hub_uuid)
if route_result and route_result.routes:
routes = route_result.routes
offers.append(OfferWithRouteType(
uuid=node['_key'],
product_uuid=node.get('product_uuid'),
product_name=node.get('product_name'),
supplier_uuid=node.get('supplier_uuid'),
supplier_name=node.get('supplier_name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
price_per_unit=node.get('price_per_unit'),
currency=node.get('currency'),
quantity=node.get('quantity'),
unit=node.get('unit'),
distance_km=node.get('distance_km'),
routes=routes,
))
return offers
except Exception as e:
logger.error("Error finding nearest offers: %s", e)
return []
def resolve_quote_calculations(self, info, lat, lon, radius=500, product_uuid=None, hub_uuid=None, quantity=None, limit=10):
"""Return calculations as arrays of offers. If quantity provided, may return split offers."""
def _parse_number(value):
if value is None:
return None
try:
return float(str(value).replace(',', '.'))
except (TypeError, ValueError):
return None
def _offer_quantity(offer):
return _parse_number(getattr(offer, 'quantity', None))
def _offer_price(offer):
return _parse_number(getattr(offer, 'price_per_unit', None))
def _offer_distance(offer):
if getattr(offer, 'distance_km', None) is not None:
return offer.distance_km or 0
if getattr(offer, 'routes', None):
route = offer.routes[0] if offer.routes else None
if route and route.total_distance_km is not None:
return route.total_distance_km or 0
return 0
offers = self.resolve_nearest_offers(
info,
lat=lat,
lon=lon,
radius=radius,
product_uuid=product_uuid,
hub_uuid=hub_uuid,
limit=max(limit * 4, limit),
)
if not offers:
return []
quantity_value = _parse_number(quantity)
offers_sorted = sorted(
offers,
key=lambda o: (
_offer_price(o) if _offer_price(o) is not None else float('inf'),
_offer_distance(o),
),
)
calculations = []
# If no requested quantity, return single-offer calculations.
if not quantity_value or quantity_value <= 0:
return [OfferCalculationType(offers=[offer]) for offer in offers_sorted[:limit]]
# Try single offer that covers quantity.
single = next(
(offer for offer in offers_sorted if (_offer_quantity(offer) or 0) >= quantity_value),
None,
)
if single:
calculations.append(OfferCalculationType(offers=[single]))
# Build a split offer (two offers) if possible.
if len(offers_sorted) >= 2:
primary = offers_sorted[0]
remaining = quantity_value - (_offer_quantity(primary) or 0)
if remaining <= 0:
secondary = offers_sorted[1]
else:
secondary = next(
(offer for offer in offers_sorted[1:] if (_offer_quantity(offer) or 0) >= remaining),
offers_sorted[1],
)
if secondary:
calculations.append(OfferCalculationType(offers=[primary, secondary]))
# Fallback: ensure at least one calculation.
if not calculations:
calculations.append(OfferCalculationType(offers=[offers_sorted[0]]))
return calculations[:limit]
def resolve_nearest_suppliers(self, info, lat, lon, radius=1000, product_uuid=None, limit=12):
"""Find nearest suppliers to coordinates, optionally filtered by product."""
db = get_db()
aql = """
FOR offer IN nodes
FILTER offer.node_type == 'offer'
FILTER offer.supplier_uuid != null
FILTER offer.latitude != null AND offer.longitude != null
"""
if product_uuid:
aql += " FILTER offer.product_uuid == @product_uuid\n"
aql += """
LET dist = DISTANCE(offer.latitude, offer.longitude, @lat, @lon) / 1000
FILTER dist <= @radius
COLLECT supplier_uuid = offer.supplier_uuid INTO offers
LET first_offer = FIRST(offers).offer
LET supplier_dist = DISTANCE(first_offer.latitude, first_offer.longitude, @lat, @lon) / 1000
// Try to find supplier node for full info
LET supplier_node = FIRST(
FOR s IN nodes
FILTER s._key == supplier_uuid
FILTER s.node_type == 'supplier'
RETURN s
)
SORT supplier_dist ASC
LIMIT @limit
RETURN {
uuid: supplier_uuid,
name: supplier_node != null ? supplier_node.name : first_offer.supplier_name,
latitude: supplier_node != null ? supplier_node.latitude : first_offer.latitude,
longitude: supplier_node != null ? supplier_node.longitude : first_offer.longitude,
distance_km: supplier_dist
}
"""
bind_vars = {'lat': lat, 'lon': lon, 'radius': radius, 'limit': limit}
if product_uuid:
bind_vars['product_uuid'] = product_uuid
try:
cursor = db.aql.execute(aql, bind_vars=bind_vars)
suppliers = []
for s in cursor:
suppliers.append(SupplierType(
uuid=s['uuid'],
name=s.get('name'),
latitude=s.get('latitude'),
longitude=s.get('longitude'),
distance_km=s.get('distance_km'),
))
logger.info("Found %d suppliers near (%.3f, %.3f) within %d km", len(suppliers), lat, lon, radius)
return suppliers
except Exception as e:
logger.error("Error finding nearest suppliers: %s", e)
return []
def resolve_route_to_coordinate(self, info, offer_uuid, lat, lon):
"""Get route from offer to target coordinates (finds nearest hub automatically)."""
db = get_db()
# Find nearest hub to target coordinates
aql_hub = """
FOR hub IN nodes
FILTER hub.node_type == 'logistics' OR hub.node_type == null
FILTER hub.product_uuid == null
FILTER hub.latitude != null AND hub.longitude != null
LET dist = DISTANCE(hub.latitude, hub.longitude, @lat, @lon) / 1000
SORT dist ASC
LIMIT 1
RETURN hub
"""
try:
cursor = db.aql.execute(aql_hub, bind_vars={'lat': lat, 'lon': lon})
hubs = list(cursor)
if not hubs:
logger.info("No hub found near coordinates (%.3f, %.3f)", lat, lon)
return None
nearest_hub = hubs[0]
hub_uuid = nearest_hub['_key']
logger.info("Found nearest hub %s to coordinates (%.3f, %.3f)", hub_uuid, lat, lon)
# Use existing offer_to_hub logic
# Note: in graphene, self is None (root value), so we call as class method
return Query.resolve_offer_to_hub(Query, info, offer_uuid, hub_uuid)
except Exception as e:
logger.error("Error finding route to coordinates: %s", e)
return None
def resolve_hubs_list(self, info, limit=50, offset=0, country=None, transport_type=None,
west=None, south=None, east=None, north=None):
"""Get paginated list of logistics hubs."""
db = get_db()
# Build bounds filter if all bounds are provided
bounds_filter = ""
if west is not None and south is not None and east is not None and north is not None:
bounds_filter = """
FILTER node.latitude != null AND node.longitude != null
FILTER node.latitude >= @south AND node.latitude <= @north
FILTER node.longitude >= @west AND node.longitude <= @east
"""
aql = f"""
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
FILTER node.product_uuid == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
FILTER @transport_type != null OR ('rail' IN types) OR ('sea' IN types)
FILTER @country == null OR node.country == @country
{bounds_filter}
SORT node.name ASC
LIMIT @offset, @limit
RETURN node
"""
bind_vars = {
'transport_type': transport_type,
'country': country,
'offset': offset,
'limit': limit,
}
# Only add bounds to bind_vars if they are used
if west is not None and south is not None and east is not None and north is not None:
bind_vars.update({
'west': west,
'south': south,
'east': east,
'north': north,
})
try:
cursor = db.aql.execute(aql, bind_vars=bind_vars)
hubs = []
for node in cursor:
hubs.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[],
))
logger.info("Returning %d hubs (offset=%d, limit=%d)", len(hubs), offset, limit)
return hubs
except Exception as e:
logger.error("Error getting hubs list: %s", e)
return []
def resolve_suppliers_list(self, info, limit=50, offset=0, country=None,
west=None, south=None, east=None, north=None):
"""Get paginated list of suppliers from graph."""
db = get_db()
# Build bounds filter if all bounds are provided
bounds_filter = ""
if west is not None and south is not None and east is not None and north is not None:
bounds_filter = """
FILTER node.latitude != null AND node.longitude != null
FILTER node.latitude >= @south AND node.latitude <= @north
FILTER node.longitude >= @west AND node.longitude <= @east
"""
aql = f"""
FOR node IN nodes
FILTER node.node_type == 'supplier'
FILTER @country == null OR node.country == @country
{bounds_filter}
SORT node.name ASC
LIMIT @offset, @limit
RETURN node
"""
bind_vars = {
'country': country,
'offset': offset,
'limit': limit,
}
# Only add bounds to bind_vars if they are used
if west is not None and south is not None and east is not None and north is not None:
bind_vars.update({
'west': west,
'south': south,
'east': east,
'north': north,
})
try:
cursor = db.aql.execute(aql, bind_vars=bind_vars)
suppliers = []
for node in cursor:
suppliers.append(SupplierType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
))
logger.info("Returning %d suppliers (offset=%d, limit=%d)", len(suppliers), offset, limit)
return suppliers
except Exception as e:
logger.error("Error getting suppliers list: %s", e)
return []
def resolve_products_list(self, info, limit=50, offset=0,
west=None, south=None, east=None, north=None):
"""Get paginated list of products from graph."""
db = get_db()
# Build bounds filter if all bounds are provided
bounds_filter = ""
if west is not None and south is not None and east is not None and north is not None:
bounds_filter = """
FILTER node.latitude != null AND node.longitude != null
FILTER node.latitude >= @south AND node.latitude <= @north
FILTER node.longitude >= @west AND node.longitude <= @east
"""
aql = f"""
FOR node IN nodes
FILTER node.node_type == 'offer'
FILTER node.product_uuid != null
{bounds_filter}
COLLECT product_uuid = node.product_uuid INTO offers
LET first_offer = FIRST(offers).node
SORT first_offer.product_name ASC
LIMIT @offset, @limit
RETURN {{
uuid: product_uuid,
name: first_offer.product_name
}}
"""
bind_vars = {
'offset': offset,
'limit': limit,
}
# Only add bounds to bind_vars if they are used
if west is not None and south is not None and east is not None and north is not None:
bind_vars.update({
'west': west,
'south': south,
'east': east,
'north': north,
})
try:
cursor = db.aql.execute(aql, bind_vars=bind_vars)
products = [ProductType(uuid=p['uuid'], name=p.get('name')) for p in cursor]
logger.info("Returning %d products (offset=%d, limit=%d)", len(products), offset, limit)
return products
except Exception as e:
logger.error("Error getting products list: %s", e)
return []
schema = graphene.Schema(query=Query)
# Helper methods attached to Query for route assembly
def _build_stage(from_doc, to_doc, transport_type, edges):
distance_km = sum(e.get('distance_km') or 0 for e in edges)
travel_time = sum(e.get('travel_time_seconds') or 0 for e in edges)
return RouteStageType(
from_uuid=from_doc.get('_key') if from_doc else None,
from_name=from_doc.get('name') if from_doc else None,
from_lat=from_doc.get('latitude') if from_doc else None,
from_lon=from_doc.get('longitude') if from_doc else None,
to_uuid=to_doc.get('_key') if to_doc else None,
to_name=to_doc.get('name') if to_doc else None,
to_lat=to_doc.get('latitude') if to_doc else None,
to_lon=to_doc.get('longitude') if to_doc else None,
distance_km=distance_km,
travel_time_seconds=travel_time,
transport_type=transport_type,
)
def _build_route_from_edges(path_edges, node_docs):
"""Собрать RoutePathType из списка ребёр (source->dest), схлопывая типы."""
if not path_edges:
return None
# Фильтруем offer edges - это не транспортные этапы, а связь оффера с локацией
path_edges = [(f, t, e) for f, t, e in path_edges if e.get('transport_type') != 'offer']
if not path_edges:
return None
stages = []
current_edges = []
current_type = None
segment_start = None
for from_key, to_key, edge in path_edges:
edge_type = edge.get('transport_type')
if current_type is None:
current_type = edge_type
current_edges = [edge]
segment_start = from_key
elif edge_type == current_type:
current_edges.append(edge)
else:
# закрываем предыдущий сегмент
stages.append(_build_stage(
node_docs.get(segment_start),
node_docs.get(from_key),
current_type,
current_edges,
))
current_type = edge_type
current_edges = [edge]
segment_start = from_key
# Последний сгусток
last_to = path_edges[-1][1]
stages.append(_build_stage(
node_docs.get(segment_start),
node_docs.get(last_to),
current_type,
current_edges,
))
total_distance = sum(s.distance_km or 0 for s in stages)
total_time = sum(s.travel_time_seconds or 0 for s in stages)
return RoutePathType(
total_distance_km=total_distance,
total_time_seconds=total_time,
stages=stages,
)
# Bind helpers to class for access in resolver
Query._build_route_from_edges = _build_route_from_edges
def _distance_km(lat1, lon1, lat2, lon2):
"""Haversine distance in km."""
r = 6371
d_lat = math.radians(lat2 - lat1)
d_lon = math.radians(lon2 - lon1)
a = (
math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(d_lon / 2) ** 2
)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return r * c
Query._distance_km = _distance_km