Files
geo/geo_app/schema.py
Ruslan Bakiev e15976382e
All checks were successful
Build Docker Image / build (push) Successful in 1m45s
Add hubCountries query and country filter for nodes
2026-01-08 10:42:34 +07:00

834 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""GraphQL schema for Geo service."""
import logging
import heapq
import math
import requests
import graphene
from django.conf import settings
from .arango_client import get_db, ensure_graph
logger = logging.getLogger(__name__)
class EdgeType(graphene.ObjectType):
"""Edge between two nodes (route)."""
to_uuid = graphene.String()
to_name = graphene.String()
to_latitude = graphene.Float()
to_longitude = graphene.Float()
distance_km = graphene.Float()
travel_time_seconds = graphene.Int()
transport_type = graphene.String()
class NodeType(graphene.ObjectType):
"""Logistics node with edges to neighbors."""
uuid = graphene.String()
name = graphene.String()
latitude = graphene.Float()
longitude = graphene.Float()
country = graphene.String()
country_code = graphene.String()
synced_at = graphene.String()
transport_types = graphene.List(graphene.String)
edges = graphene.List(EdgeType)
class NodeConnectionsType(graphene.ObjectType):
"""Auto + rail edges for a node, rail uses nearest rail node."""
hub = graphene.Field(NodeType)
rail_node = graphene.Field(NodeType)
auto_edges = graphene.List(EdgeType)
rail_edges = graphene.List(EdgeType)
class RouteType(graphene.ObjectType):
"""Route between two points with geometry."""
distance_km = graphene.Float()
geometry = graphene.JSONString(description="GeoJSON LineString coordinates")
class RouteStageType(graphene.ObjectType):
"""Single stage in a multi-hop route."""
from_uuid = graphene.String()
from_name = graphene.String()
from_lat = graphene.Float()
from_lon = graphene.Float()
to_uuid = graphene.String()
to_name = graphene.String()
to_lat = graphene.Float()
to_lon = graphene.Float()
distance_km = graphene.Float()
travel_time_seconds = graphene.Int()
transport_type = graphene.String()
class RoutePathType(graphene.ObjectType):
"""Complete route through graph with multiple stages."""
total_distance_km = graphene.Float()
total_time_seconds = graphene.Int()
stages = graphene.List(RouteStageType)
class ProductRouteOptionType(graphene.ObjectType):
"""Route options for a product source to the destination."""
source_uuid = graphene.String()
source_name = graphene.String()
source_lat = graphene.Float()
source_lon = graphene.Float()
distance_km = graphene.Float()
routes = graphene.List(RoutePathType)
class Query(graphene.ObjectType):
"""Root query."""
MAX_EXPANSIONS = 20000
node = graphene.Field(
NodeType,
uuid=graphene.String(required=True),
description="Get node by UUID with all edges to neighbors",
)
nodes = graphene.List(
NodeType,
description="Get all nodes (without edges for performance)",
limit=graphene.Int(),
offset=graphene.Int(),
transport_type=graphene.String(),
country=graphene.String(description="Filter by country name"),
search=graphene.String(description="Search by node name (case-insensitive)"),
)
nodes_count = graphene.Int(
transport_type=graphene.String(),
country=graphene.String(description="Filter by country name"),
description="Get total count of nodes (with optional transport/country filter)",
)
hub_countries = graphene.List(
graphene.String,
description="List of countries that have logistics hubs",
)
nearest_nodes = graphene.List(
NodeType,
lat=graphene.Float(required=True, description="Latitude"),
lon=graphene.Float(required=True, description="Longitude"),
limit=graphene.Int(default_value=5, description="Max results"),
description="Find nearest logistics nodes to given coordinates",
)
node_connections = graphene.Field(
NodeConnectionsType,
uuid=graphene.String(required=True),
limit_auto=graphene.Int(default_value=12),
limit_rail=graphene.Int(default_value=12),
description="Get auto + rail edges for a node (rail uses nearest rail node)",
)
auto_route = graphene.Field(
RouteType,
from_lat=graphene.Float(required=True),
from_lon=graphene.Float(required=True),
to_lat=graphene.Float(required=True),
to_lon=graphene.Float(required=True),
description="Get auto route between two points via GraphHopper",
)
rail_route = graphene.Field(
RouteType,
from_lat=graphene.Float(required=True),
from_lon=graphene.Float(required=True),
to_lat=graphene.Float(required=True),
to_lon=graphene.Float(required=True),
description="Get rail route between two points via OpenRailRouting",
)
find_routes = graphene.List(
RoutePathType,
from_uuid=graphene.String(required=True),
to_uuid=graphene.String(required=True),
limit=graphene.Int(default_value=3),
description="Find K shortest routes through graph between two nodes",
)
find_product_routes = graphene.List(
ProductRouteOptionType,
product_uuid=graphene.String(required=True),
to_uuid=graphene.String(required=True),
limit_sources=graphene.Int(default_value=3),
limit_routes=graphene.Int(default_value=3),
description="Find routes from product offer nodes to destination",
)
@staticmethod
def _build_routes(db, from_uuid, to_uuid, limit):
"""Shared helper to compute K shortest routes between two nodes."""
aql = """
FOR path IN ANY K_SHORTEST_PATHS
@from_vertex TO @to_vertex
GRAPH 'optovia_graph'
OPTIONS { weightAttribute: 'distance_km' }
LIMIT @limit
RETURN {
vertices: path.vertices,
edges: path.edges,
weight: path.weight
}
"""
try:
cursor = db.aql.execute(
aql,
bind_vars={
'from_vertex': f'nodes/{from_uuid}',
'to_vertex': f'nodes/{to_uuid}',
'limit': limit,
},
)
paths = list(cursor)
except Exception as e:
logger.error("K_SHORTEST_PATHS query failed: %s", e)
return []
if not paths:
logger.info("No paths found from %s to %s", from_uuid, to_uuid)
return []
routes = []
for path in paths:
vertices = path.get('vertices', [])
edges = path.get('edges', [])
# Build vertex lookup by _id
vertex_by_id = {v['_id']: v for v in vertices}
stages = []
for edge in edges:
from_node = vertex_by_id.get(edge['_from'], {})
to_node = vertex_by_id.get(edge['_to'], {})
stages.append(RouteStageType(
from_uuid=from_node.get('_key'),
from_name=from_node.get('name'),
from_lat=from_node.get('latitude'),
from_lon=from_node.get('longitude'),
to_uuid=to_node.get('_key'),
to_name=to_node.get('name'),
to_lat=to_node.get('latitude'),
to_lon=to_node.get('longitude'),
distance_km=edge.get('distance_km'),
travel_time_seconds=edge.get('travel_time_seconds'),
transport_type=edge.get('transport_type'),
))
total_time = sum(s.travel_time_seconds or 0 for s in stages)
routes.append(RoutePathType(
total_distance_km=path.get('weight'),
total_time_seconds=total_time,
stages=stages,
))
return routes
def resolve_node(self, info, uuid):
"""
Get a single node with all its outgoing edges.
Returns node info + list of edges to neighbors with distances.
"""
db = get_db()
# Get node
nodes_col = db.collection('nodes')
node = nodes_col.get(uuid)
if not node:
return None
# Get all outgoing edges from this node
edges_col = db.collection('edges')
aql = """
FOR edge IN edges
FILTER edge._from == @from_id
LET to_node = DOCUMENT(edge._to)
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
"""
cursor = db.aql.execute(aql, bind_vars={'from_id': f"nodes/{uuid}"})
edges = list(cursor)
logger.info("Node %s has %d edges", uuid, len(edges))
return NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[EdgeType(**e) for e in edges],
)
def resolve_nodes(self, info, limit=None, offset=None, transport_type=None, country=None, search=None):
"""Get all logistics nodes (without edges for list view)."""
db = get_db()
# Only return logistics nodes (not buyer/seller addresses)
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
FILTER @country == null OR node.country == @country
FILTER @search == null OR CONTAINS(LOWER(node.name), LOWER(@search)) OR CONTAINS(LOWER(node.country), LOWER(@search))
SORT node.name ASC
LIMIT @offset, @limit
RETURN node
"""
cursor = db.aql.execute(
aql,
bind_vars={
'transport_type': transport_type,
'country': country,
'search': search,
'offset': 0 if offset is None else offset,
'limit': 1000000 if limit is None else limit,
},
)
nodes = []
for node in cursor:
nodes.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[], # Don't load edges for list
))
logger.info("Returning %d nodes", len(nodes))
return nodes
def resolve_nodes_count(self, info, transport_type=None, country=None):
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
FILTER @country == null OR node.country == @country
COLLECT WITH COUNT INTO length
RETURN length
"""
cursor = db.aql.execute(aql, bind_vars={'transport_type': transport_type, 'country': country})
return next(cursor, 0)
def resolve_hub_countries(self, info):
"""Get unique country names from logistics hubs."""
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
FILTER node.country != null
COLLECT country = node.country
SORT country ASC
RETURN country
"""
cursor = db.aql.execute(aql)
return list(cursor)
def resolve_nearest_nodes(self, info, lat, lon, limit=5):
"""Find nearest logistics nodes to given coordinates."""
db = get_db()
# Get all logistics nodes and calculate distance
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
FILTER node.latitude != null AND node.longitude != null
LET dist = DISTANCE(node.latitude, node.longitude, @lat, @lon) / 1000
SORT dist ASC
LIMIT @limit
RETURN MERGE(node, {distance_km: dist})
"""
cursor = db.aql.execute(
aql,
bind_vars={'lat': lat, 'lon': lon, 'limit': limit},
)
nodes = []
for node in cursor:
nodes.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[],
))
return nodes
def resolve_node_connections(self, info, uuid, limit_auto=12, limit_rail=12):
"""Get auto edges from hub and rail edges from nearest rail node."""
db = get_db()
nodes_col = db.collection('nodes')
hub = nodes_col.get(uuid)
if not hub:
return None
aql = """
LET auto_edges = (
FOR edge IN edges
FILTER edge._from == @from_id AND edge.transport_type == "auto"
LET to_node = DOCUMENT(edge._to)
FILTER to_node != null
SORT edge.distance_km ASC
LIMIT @limit_auto
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
)
LET hub_has_rail = @hub_has_rail
LET rail_node = hub_has_rail ? DOCUMENT(@from_id) : FIRST(
FOR node IN nodes
FILTER node.latitude != null AND node.longitude != null
FILTER 'rail' IN node.transport_types
SORT DISTANCE(@hub_lat, @hub_lon, node.latitude, node.longitude)
LIMIT 1
RETURN node
)
LET rail_edges = rail_node == null ? [] : (
FOR edge IN edges
FILTER edge._from == CONCAT("nodes/", rail_node._key) AND edge.transport_type == "rail"
LET to_node = DOCUMENT(edge._to)
FILTER to_node != null
SORT edge.distance_km ASC
LIMIT @limit_rail
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
)
RETURN {
hub: DOCUMENT(@from_id),
rail_node: rail_node,
auto_edges: auto_edges,
rail_edges: rail_edges
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'from_id': f"nodes/{uuid}",
'hub_lat': hub.get('latitude'),
'hub_lon': hub.get('longitude'),
'hub_has_rail': 'rail' in (hub.get('transport_types') or []),
'limit_auto': limit_auto,
'limit_rail': limit_rail,
},
)
result = next(cursor, None)
if not result:
return None
def build_node(doc):
if not doc:
return None
return NodeType(
uuid=doc['_key'],
name=doc.get('name'),
latitude=doc.get('latitude'),
longitude=doc.get('longitude'),
country=doc.get('country'),
country_code=doc.get('country_code'),
synced_at=doc.get('synced_at'),
transport_types=doc.get('transport_types') or [],
edges=[],
)
return NodeConnectionsType(
hub=build_node(result.get('hub')),
rail_node=build_node(result.get('rail_node')),
auto_edges=[EdgeType(**e) for e in result.get('auto_edges') or []],
rail_edges=[EdgeType(**e) for e in result.get('rail_edges') or []],
)
def resolve_auto_route(self, info, from_lat, from_lon, to_lat, to_lon):
"""Get auto route via GraphHopper."""
url = f"{settings.GRAPHHOPPER_EXTERNAL_URL}/route"
params = {
'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"],
'profile': 'car',
'instructions': 'false',
'calc_points': 'true',
'points_encoded': 'false',
}
try:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if 'paths' in data and len(data['paths']) > 0:
path = data['paths'][0]
distance_km = round(path.get('distance', 0) / 1000, 2)
points = path.get('points', {})
coordinates = points.get('coordinates', [])
return RouteType(
distance_km=distance_km,
geometry=coordinates,
)
except requests.RequestException as e:
logger.error("GraphHopper request failed: %s", e)
return None
def resolve_rail_route(self, info, from_lat, from_lon, to_lat, to_lon):
"""Get rail route via OpenRailRouting."""
url = f"{settings.OPENRAILROUTING_EXTERNAL_URL}/route"
params = {
'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"],
'profile': 'all_tracks',
'calc_points': 'true',
'points_encoded': 'false',
}
try:
response = requests.get(url, params=params, timeout=60)
response.raise_for_status()
data = response.json()
if 'paths' in data and len(data['paths']) > 0:
path = data['paths'][0]
distance_km = round(path.get('distance', 0) / 1000, 2)
points = path.get('points', {})
coordinates = points.get('coordinates', [])
return RouteType(
distance_km=distance_km,
geometry=coordinates,
)
except requests.RequestException as e:
logger.error("OpenRailRouting request failed: %s", e)
return None
def resolve_find_routes(self, info, from_uuid, to_uuid, limit=3):
"""Find K shortest routes through graph using ArangoDB K_SHORTEST_PATHS."""
db = get_db()
ensure_graph()
return Query._build_routes(db, from_uuid, to_uuid, limit)
def resolve_find_product_routes(self, info, product_uuid, to_uuid, limit_sources=3, limit_routes=3):
"""
Найти до N ближайших офферов и вернуть по одному маршруту:
авто -> (rail сколько угодно) -> авто. Поиск идёт от точки назначения наружу.
"""
db = get_db()
ensure_graph() # graph exists, но используем ручной обход
# Load destination node for distance sorting
nodes_col = db.collection('nodes')
dest = nodes_col.get(to_uuid)
if not dest:
logger.info("Destination node %s not found", to_uuid)
return []
dest_lat = dest.get('latitude')
dest_lon = dest.get('longitude')
if dest_lat is None or dest_lon is None:
logger.info("Destination node %s missing coordinates", to_uuid)
return []
max_sources = limit_sources or 5
max_routes = 1 # всегда один маршрут на оффер
# Helpers
def allowed_next_phase(current_phase, transport_type):
"""
Phases — расширение радиуса поиска, ЖД не обязателен:
- end_auto: можно 1 авто, rail, или сразу offer
- end_auto_done: авто использовано — rail или offer
- rail: любое кол-во rail, потом 1 авто или offer
- start_auto_done: авто использовано — только offer
Offer можно найти на любом этапе!
"""
if current_phase == 'end_auto':
if transport_type == 'offer':
return 'offer' # нашли сразу рядом
if transport_type == 'auto':
return 'end_auto_done'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'end_auto_done':
if transport_type == 'offer':
return 'offer' # нашли после 1 авто
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'rail':
if transport_type == 'offer':
return 'offer' # нашли на ЖД станции
if transport_type == 'rail':
return 'rail'
if transport_type == 'auto':
return 'start_auto_done'
return None
if current_phase == 'start_auto_done':
if transport_type == 'offer':
return 'offer'
return None
return None
def fetch_neighbors(node_key, phase):
"""Получить соседей с учётом допустимых типов транспорта."""
# offer доступен на всех фазах — ищем ближайший
if phase == 'end_auto':
types = ['auto', 'rail', 'offer']
elif phase == 'end_auto_done':
types = ['rail', 'offer']
elif phase == 'rail':
types = ['rail', 'auto', 'offer']
elif phase == 'start_auto_done':
types = ['offer']
else:
types = ['offer']
aql = """
FOR edge IN edges
FILTER edge.transport_type IN @types
FILTER edge._from == @node_id OR edge._to == @node_id
LET neighbor_id = edge._from == @node_id ? edge._to : edge._from
LET neighbor = DOCUMENT(neighbor_id)
FILTER neighbor != null
RETURN {
neighbor_key: neighbor._key,
neighbor_doc: neighbor,
from_id: edge._from,
to_id: edge._to,
transport_type: edge.transport_type,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'node_id': f"nodes/{node_key}",
'types': types,
},
)
return list(cursor)
# Priority queue: (cost, seq, node_key, phase)
queue = []
counter = 0
heapq.heappush(queue, (0, counter, to_uuid, 'end_auto'))
visited = {} # (node, phase) -> best_cost
predecessors = {} # (node, phase) -> (prev_node, prev_phase, edge_info)
node_docs = {to_uuid: dest}
found_routes = []
expansions = 0
while queue and len(found_routes) < max_sources and expansions < Query.MAX_EXPANSIONS:
cost, _, node_key, phase = heapq.heappop(queue)
if (node_key, phase) in visited and cost > visited[(node_key, phase)]:
continue
# Если нашли оффер нужного товара в допустимой фазе, фиксируем маршрут
node_doc = node_docs.get(node_key)
if node_doc and node_doc.get('product_uuid') == product_uuid:
path_edges = []
state = (node_key, phase)
current_key = node_key
while state in predecessors:
prev_state, edge_info = predecessors[state]
prev_key = prev_state[0]
path_edges.append((current_key, prev_key, edge_info)) # from source toward dest
state = prev_state
current_key = prev_key
route = _build_route_from_edges(path_edges, node_docs)
distance_km = None
src_lat = node_doc.get('latitude')
src_lon = node_doc.get('longitude')
if src_lat is not None and src_lon is not None:
distance_km = _distance_km(src_lat, src_lon, dest_lat, dest_lon)
found_routes.append(ProductRouteOptionType(
source_uuid=node_key,
source_name=node_doc.get('name'),
source_lat=node_doc.get('latitude'),
source_lon=node_doc.get('longitude'),
distance_km=distance_km,
routes=[route] if route else [],
))
# продолжаем искать остальных
continue
neighbors = fetch_neighbors(node_key, phase)
expansions += 1
for neighbor in neighbors:
transport_type = neighbor.get('transport_type')
next_phase = allowed_next_phase(phase, transport_type)
if next_phase is None:
continue
travel_time = neighbor.get('travel_time_seconds')
distance_km = neighbor.get('distance_km')
neighbor_key = neighbor.get('neighbor_key')
node_docs[neighbor_key] = neighbor.get('neighbor_doc')
step_cost = travel_time if travel_time is not None else (distance_km or 0)
new_cost = cost + step_cost
state_key = (neighbor_key, next_phase)
if state_key in visited and new_cost >= visited[state_key]:
continue
visited[state_key] = new_cost
counter += 1
heapq.heappush(queue, (new_cost, counter, neighbor_key, next_phase))
predecessors[state_key] = ((node_key, phase), neighbor)
if not found_routes:
logger.info("No product routes found for %s -> %s", product_uuid, to_uuid)
return []
return found_routes
schema = graphene.Schema(query=Query)
# Helper methods attached to Query for route assembly
def _build_stage(from_doc, to_doc, transport_type, edges):
distance_km = sum(e.get('distance_km') or 0 for e in edges)
travel_time = sum(e.get('travel_time_seconds') or 0 for e in edges)
return RouteStageType(
from_uuid=from_doc.get('_key') if from_doc else None,
from_name=from_doc.get('name') if from_doc else None,
from_lat=from_doc.get('latitude') if from_doc else None,
from_lon=from_doc.get('longitude') if from_doc else None,
to_uuid=to_doc.get('_key') if to_doc else None,
to_name=to_doc.get('name') if to_doc else None,
to_lat=to_doc.get('latitude') if to_doc else None,
to_lon=to_doc.get('longitude') if to_doc else None,
distance_km=distance_km,
travel_time_seconds=travel_time,
transport_type=transport_type,
)
def _build_route_from_edges(path_edges, node_docs):
"""Собрать RoutePathType из списка ребёр (source->dest), схлопывая типы."""
if not path_edges:
return None
stages = []
current_edges = []
current_type = None
segment_start = None
for from_key, to_key, edge in path_edges:
edge_type = edge.get('transport_type')
if current_type is None:
current_type = edge_type
current_edges = [edge]
segment_start = from_key
elif edge_type == current_type:
current_edges.append(edge)
else:
# закрываем предыдущий сегмент
stages.append(_build_stage(
node_docs.get(segment_start),
node_docs.get(from_key),
current_type,
current_edges,
))
current_type = edge_type
current_edges = [edge]
segment_start = from_key
# Последний сгусток
last_to = path_edges[-1][1]
stages.append(_build_stage(
node_docs.get(segment_start),
node_docs.get(last_to),
current_type,
current_edges,
))
total_distance = sum(s.distance_km or 0 for s in stages)
total_time = sum(s.travel_time_seconds or 0 for s in stages)
return RoutePathType(
total_distance_km=total_distance,
total_time_seconds=total_time,
stages=stages,
)
# Bind helpers to class for access in resolver
Query._build_route_from_edges = _build_route_from_edges
def _distance_km(lat1, lon1, lat2, lon2):
"""Haversine distance in km."""
r = 6371
d_lat = math.radians(lat2 - lat1)
d_lon = math.radians(lon2 - lon1)
a = (
math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(d_lon / 2) ** 2
)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return r * c
Query._distance_km = _distance_km