"""GraphQL schema for Geo service.""" import logging import heapq import math import requests import graphene from django.conf import settings from .arango_client import get_db, ensure_graph logger = logging.getLogger(__name__) class EdgeType(graphene.ObjectType): """Edge between two nodes (route).""" to_uuid = graphene.String() to_name = graphene.String() to_latitude = graphene.Float() to_longitude = graphene.Float() distance_km = graphene.Float() travel_time_seconds = graphene.Int() transport_type = graphene.String() class NodeType(graphene.ObjectType): """Logistics node with edges to neighbors.""" uuid = graphene.String() name = graphene.String() latitude = graphene.Float() longitude = graphene.Float() country = graphene.String() country_code = graphene.String() synced_at = graphene.String() transport_types = graphene.List(graphene.String) edges = graphene.List(EdgeType) class NodeConnectionsType(graphene.ObjectType): """Auto + rail edges for a node, rail uses nearest rail node.""" hub = graphene.Field(NodeType) rail_node = graphene.Field(NodeType) auto_edges = graphene.List(EdgeType) rail_edges = graphene.List(EdgeType) class RouteType(graphene.ObjectType): """Route between two points with geometry.""" distance_km = graphene.Float() geometry = graphene.JSONString(description="GeoJSON LineString coordinates") class RouteStageType(graphene.ObjectType): """Single stage in a multi-hop route.""" from_uuid = graphene.String() from_name = graphene.String() from_lat = graphene.Float() from_lon = graphene.Float() to_uuid = graphene.String() to_name = graphene.String() to_lat = graphene.Float() to_lon = graphene.Float() distance_km = graphene.Float() travel_time_seconds = graphene.Int() transport_type = graphene.String() class RoutePathType(graphene.ObjectType): """Complete route through graph with multiple stages.""" total_distance_km = graphene.Float() total_time_seconds = graphene.Int() stages = graphene.List(RouteStageType) class ProductRouteOptionType(graphene.ObjectType): """Route options for a product source to the destination.""" source_uuid = graphene.String() source_name = graphene.String() source_lat = graphene.Float() source_lon = graphene.Float() distance_km = graphene.Float() routes = graphene.List(RoutePathType) class Query(graphene.ObjectType): """Root query.""" MAX_EXPANSIONS = 20000 node = graphene.Field( NodeType, uuid=graphene.String(required=True), description="Get node by UUID with all edges to neighbors", ) nodes = graphene.List( NodeType, description="Get all nodes (without edges for performance)", limit=graphene.Int(), offset=graphene.Int(), transport_type=graphene.String(), search=graphene.String(description="Search by node name (case-insensitive)"), ) nodes_count = graphene.Int( transport_type=graphene.String(), description="Get total count of nodes (with optional transport filter)", ) node_connections = graphene.Field( NodeConnectionsType, uuid=graphene.String(required=True), limit_auto=graphene.Int(default_value=12), limit_rail=graphene.Int(default_value=12), description="Get auto + rail edges for a node (rail uses nearest rail node)", ) auto_route = graphene.Field( RouteType, from_lat=graphene.Float(required=True), from_lon=graphene.Float(required=True), to_lat=graphene.Float(required=True), to_lon=graphene.Float(required=True), description="Get auto route between two points via GraphHopper", ) rail_route = graphene.Field( RouteType, from_lat=graphene.Float(required=True), from_lon=graphene.Float(required=True), to_lat=graphene.Float(required=True), to_lon=graphene.Float(required=True), description="Get rail route between two points via OpenRailRouting", ) find_routes = graphene.List( RoutePathType, from_uuid=graphene.String(required=True), to_uuid=graphene.String(required=True), limit=graphene.Int(default_value=3), description="Find K shortest routes through graph between two nodes", ) find_product_routes = graphene.List( ProductRouteOptionType, product_uuid=graphene.String(required=True), to_uuid=graphene.String(required=True), limit_sources=graphene.Int(default_value=3), limit_routes=graphene.Int(default_value=3), description="Find routes from product offer nodes to destination", ) @staticmethod def _build_routes(db, from_uuid, to_uuid, limit): """Shared helper to compute K shortest routes between two nodes.""" aql = """ FOR path IN ANY K_SHORTEST_PATHS @from_vertex TO @to_vertex GRAPH 'optovia_graph' OPTIONS { weightAttribute: 'distance_km' } LIMIT @limit RETURN { vertices: path.vertices, edges: path.edges, weight: path.weight } """ try: cursor = db.aql.execute( aql, bind_vars={ 'from_vertex': f'nodes/{from_uuid}', 'to_vertex': f'nodes/{to_uuid}', 'limit': limit, }, ) paths = list(cursor) except Exception as e: logger.error("K_SHORTEST_PATHS query failed: %s", e) return [] if not paths: logger.info("No paths found from %s to %s", from_uuid, to_uuid) return [] routes = [] for path in paths: vertices = path.get('vertices', []) edges = path.get('edges', []) # Build vertex lookup by _id vertex_by_id = {v['_id']: v for v in vertices} stages = [] for edge in edges: from_node = vertex_by_id.get(edge['_from'], {}) to_node = vertex_by_id.get(edge['_to'], {}) stages.append(RouteStageType( from_uuid=from_node.get('_key'), from_name=from_node.get('name'), from_lat=from_node.get('latitude'), from_lon=from_node.get('longitude'), to_uuid=to_node.get('_key'), to_name=to_node.get('name'), to_lat=to_node.get('latitude'), to_lon=to_node.get('longitude'), distance_km=edge.get('distance_km'), travel_time_seconds=edge.get('travel_time_seconds'), transport_type=edge.get('transport_type'), )) total_time = sum(s.travel_time_seconds or 0 for s in stages) routes.append(RoutePathType( total_distance_km=path.get('weight'), total_time_seconds=total_time, stages=stages, )) return routes def resolve_node(self, info, uuid): """ Get a single node with all its outgoing edges. Returns node info + list of edges to neighbors with distances. """ db = get_db() # Get node nodes_col = db.collection('nodes') node = nodes_col.get(uuid) if not node: return None # Get all outgoing edges from this node edges_col = db.collection('edges') aql = """ FOR edge IN edges FILTER edge._from == @from_id LET to_node = DOCUMENT(edge._to) RETURN { to_uuid: to_node._key, to_name: to_node.name, to_latitude: to_node.latitude, to_longitude: to_node.longitude, distance_km: edge.distance_km, travel_time_seconds: edge.travel_time_seconds, transport_type: edge.transport_type } """ cursor = db.aql.execute(aql, bind_vars={'from_id': f"nodes/{uuid}"}) edges = list(cursor) logger.info("Node %s has %d edges", uuid, len(edges)) return NodeType( uuid=node['_key'], name=node.get('name'), latitude=node.get('latitude'), longitude=node.get('longitude'), country=node.get('country'), country_code=node.get('country_code'), synced_at=node.get('synced_at'), transport_types=node.get('transport_types') or [], edges=[EdgeType(**e) for e in edges], ) def resolve_nodes(self, info, limit=None, offset=None, transport_type=None, search=None): """Get all logistics nodes (without edges for list view).""" db = get_db() # Only return logistics nodes (not buyer/seller addresses) aql = """ FOR node IN nodes FILTER node.node_type == 'logistics' OR node.node_type == null LET types = node.transport_types != null ? node.transport_types : [] FILTER @transport_type == null OR @transport_type IN types FILTER @search == null OR CONTAINS(LOWER(node.name), LOWER(@search)) OR CONTAINS(LOWER(node.country), LOWER(@search)) SORT node.name ASC LIMIT @offset, @limit RETURN node """ cursor = db.aql.execute( aql, bind_vars={ 'transport_type': transport_type, 'search': search, 'offset': 0 if offset is None else offset, 'limit': 1000000 if limit is None else limit, }, ) nodes = [] for node in cursor: nodes.append(NodeType( uuid=node['_key'], name=node.get('name'), latitude=node.get('latitude'), longitude=node.get('longitude'), country=node.get('country'), country_code=node.get('country_code'), synced_at=node.get('synced_at'), transport_types=node.get('transport_types') or [], edges=[], # Don't load edges for list )) logger.info("Returning %d nodes", len(nodes)) return nodes def resolve_nodes_count(self, info, transport_type=None): db = get_db() aql = """ FOR node IN nodes FILTER node.node_type == 'logistics' OR node.node_type == null LET types = node.transport_types != null ? node.transport_types : [] FILTER @transport_type == null OR @transport_type IN types COLLECT WITH COUNT INTO length RETURN length """ cursor = db.aql.execute(aql, bind_vars={'transport_type': transport_type}) return next(cursor, 0) def resolve_node_connections(self, info, uuid, limit_auto=12, limit_rail=12): """Get auto edges from hub and rail edges from nearest rail node.""" db = get_db() nodes_col = db.collection('nodes') hub = nodes_col.get(uuid) if not hub: return None aql = """ LET auto_edges = ( FOR edge IN edges FILTER edge._from == @from_id AND edge.transport_type == "auto" LET to_node = DOCUMENT(edge._to) FILTER to_node != null SORT edge.distance_km ASC LIMIT @limit_auto RETURN { to_uuid: to_node._key, to_name: to_node.name, to_latitude: to_node.latitude, to_longitude: to_node.longitude, distance_km: edge.distance_km, travel_time_seconds: edge.travel_time_seconds, transport_type: edge.transport_type } ) LET hub_has_rail = @hub_has_rail LET rail_node = hub_has_rail ? DOCUMENT(@from_id) : FIRST( FOR node IN nodes FILTER node.latitude != null AND node.longitude != null FILTER 'rail' IN node.transport_types SORT DISTANCE(@hub_lat, @hub_lon, node.latitude, node.longitude) LIMIT 1 RETURN node ) LET rail_edges = rail_node == null ? [] : ( FOR edge IN edges FILTER edge._from == CONCAT("nodes/", rail_node._key) AND edge.transport_type == "rail" LET to_node = DOCUMENT(edge._to) FILTER to_node != null SORT edge.distance_km ASC LIMIT @limit_rail RETURN { to_uuid: to_node._key, to_name: to_node.name, to_latitude: to_node.latitude, to_longitude: to_node.longitude, distance_km: edge.distance_km, travel_time_seconds: edge.travel_time_seconds, transport_type: edge.transport_type } ) RETURN { hub: DOCUMENT(@from_id), rail_node: rail_node, auto_edges: auto_edges, rail_edges: rail_edges } """ cursor = db.aql.execute( aql, bind_vars={ 'from_id': f"nodes/{uuid}", 'hub_lat': hub.get('latitude'), 'hub_lon': hub.get('longitude'), 'hub_has_rail': 'rail' in (hub.get('transport_types') or []), 'limit_auto': limit_auto, 'limit_rail': limit_rail, }, ) result = next(cursor, None) if not result: return None def build_node(doc): if not doc: return None return NodeType( uuid=doc['_key'], name=doc.get('name'), latitude=doc.get('latitude'), longitude=doc.get('longitude'), country=doc.get('country'), country_code=doc.get('country_code'), synced_at=doc.get('synced_at'), transport_types=doc.get('transport_types') or [], edges=[], ) return NodeConnectionsType( hub=build_node(result.get('hub')), rail_node=build_node(result.get('rail_node')), auto_edges=[EdgeType(**e) for e in result.get('auto_edges') or []], rail_edges=[EdgeType(**e) for e in result.get('rail_edges') or []], ) def resolve_auto_route(self, info, from_lat, from_lon, to_lat, to_lon): """Get auto route via GraphHopper.""" url = f"{settings.GRAPHHOPPER_EXTERNAL_URL}/route" params = { 'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"], 'profile': 'car', 'instructions': 'false', 'calc_points': 'true', 'points_encoded': 'false', } try: response = requests.get(url, params=params, timeout=30) response.raise_for_status() data = response.json() if 'paths' in data and len(data['paths']) > 0: path = data['paths'][0] distance_km = round(path.get('distance', 0) / 1000, 2) points = path.get('points', {}) coordinates = points.get('coordinates', []) return RouteType( distance_km=distance_km, geometry=coordinates, ) except requests.RequestException as e: logger.error("GraphHopper request failed: %s", e) return None def resolve_rail_route(self, info, from_lat, from_lon, to_lat, to_lon): """Get rail route via OpenRailRouting.""" url = f"{settings.OPENRAILROUTING_EXTERNAL_URL}/route" params = { 'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"], 'profile': 'all_tracks', 'calc_points': 'true', 'points_encoded': 'false', } try: response = requests.get(url, params=params, timeout=60) response.raise_for_status() data = response.json() if 'paths' in data and len(data['paths']) > 0: path = data['paths'][0] distance_km = round(path.get('distance', 0) / 1000, 2) points = path.get('points', {}) coordinates = points.get('coordinates', []) return RouteType( distance_km=distance_km, geometry=coordinates, ) except requests.RequestException as e: logger.error("OpenRailRouting request failed: %s", e) return None def resolve_find_routes(self, info, from_uuid, to_uuid, limit=3): """Find K shortest routes through graph using ArangoDB K_SHORTEST_PATHS.""" db = get_db() ensure_graph() return Query._build_routes(db, from_uuid, to_uuid, limit) def resolve_find_product_routes(self, info, product_uuid, to_uuid, limit_sources=3, limit_routes=3): """ Найти до N ближайших офферов и вернуть по одному маршруту: авто -> (rail сколько угодно) -> авто. Поиск идёт от точки назначения наружу. """ db = get_db() ensure_graph() # graph exists, но используем ручной обход # Load destination node for distance sorting nodes_col = db.collection('nodes') dest = nodes_col.get(to_uuid) if not dest: logger.info("Destination node %s not found", to_uuid) return [] dest_lat = dest.get('latitude') dest_lon = dest.get('longitude') if dest_lat is None or dest_lon is None: logger.info("Destination node %s missing coordinates", to_uuid) return [] max_sources = limit_sources or 5 max_routes = 1 # всегда один маршрут на оффер # Helpers def allowed_next_phase(current_phase, transport_type): """ Phases — расширение радиуса поиска, ЖД не обязателен: - end_auto: можно 1 авто, rail, или сразу offer - end_auto_done: авто использовано — rail или offer - rail: любое кол-во rail, потом 1 авто или offer - start_auto_done: авто использовано — только offer Offer можно найти на любом этапе! """ if current_phase == 'end_auto': if transport_type == 'offer': return 'offer' # нашли сразу рядом if transport_type == 'auto': return 'end_auto_done' if transport_type == 'rail': return 'rail' return None if current_phase == 'end_auto_done': if transport_type == 'offer': return 'offer' # нашли после 1 авто if transport_type == 'rail': return 'rail' return None if current_phase == 'rail': if transport_type == 'offer': return 'offer' # нашли на ЖД станции if transport_type == 'rail': return 'rail' if transport_type == 'auto': return 'start_auto_done' return None if current_phase == 'start_auto_done': if transport_type == 'offer': return 'offer' return None return None def fetch_neighbors(node_key, phase): """Получить соседей с учётом допустимых типов транспорта.""" # offer доступен на всех фазах — ищем ближайший if phase == 'end_auto': types = ['auto', 'rail', 'offer'] elif phase == 'end_auto_done': types = ['rail', 'offer'] elif phase == 'rail': types = ['rail', 'auto', 'offer'] elif phase == 'start_auto_done': types = ['offer'] else: types = ['offer'] aql = """ FOR edge IN edges FILTER edge.transport_type IN @types FILTER edge._from == @node_id OR edge._to == @node_id LET neighbor_id = edge._from == @node_id ? edge._to : edge._from LET neighbor = DOCUMENT(neighbor_id) FILTER neighbor != null RETURN { neighbor_key: neighbor._key, neighbor_doc: neighbor, from_id: edge._from, to_id: edge._to, transport_type: edge.transport_type, distance_km: edge.distance_km, travel_time_seconds: edge.travel_time_seconds } """ cursor = db.aql.execute( aql, bind_vars={ 'node_id': f"nodes/{node_key}", 'types': types, }, ) return list(cursor) # Priority queue: (cost, seq, node_key, phase) queue = [] counter = 0 heapq.heappush(queue, (0, counter, to_uuid, 'end_auto')) visited = {} # (node, phase) -> best_cost predecessors = {} # (node, phase) -> (prev_node, prev_phase, edge_info) node_docs = {to_uuid: dest} found_routes = [] expansions = 0 while queue and len(found_routes) < max_sources and expansions < Query.MAX_EXPANSIONS: cost, _, node_key, phase = heapq.heappop(queue) if (node_key, phase) in visited and cost > visited[(node_key, phase)]: continue # Если нашли оффер нужного товара в допустимой фазе, фиксируем маршрут node_doc = node_docs.get(node_key) if node_doc and node_doc.get('product_uuid') == product_uuid: path_edges = [] state = (node_key, phase) current_key = node_key while state in predecessors: prev_state, edge_info = predecessors[state] prev_key = prev_state[0] path_edges.append((current_key, prev_key, edge_info)) # from source toward dest state = prev_state current_key = prev_key route = _build_route_from_edges(path_edges, node_docs) distance_km = None src_lat = node_doc.get('latitude') src_lon = node_doc.get('longitude') if src_lat is not None and src_lon is not None: distance_km = _distance_km(src_lat, src_lon, dest_lat, dest_lon) found_routes.append(ProductRouteOptionType( source_uuid=node_key, source_name=node_doc.get('name'), source_lat=node_doc.get('latitude'), source_lon=node_doc.get('longitude'), distance_km=distance_km, routes=[route] if route else [], )) # продолжаем искать остальных continue neighbors = fetch_neighbors(node_key, phase) expansions += 1 for neighbor in neighbors: transport_type = neighbor.get('transport_type') next_phase = allowed_next_phase(phase, transport_type) if next_phase is None: continue travel_time = neighbor.get('travel_time_seconds') distance_km = neighbor.get('distance_km') neighbor_key = neighbor.get('neighbor_key') node_docs[neighbor_key] = neighbor.get('neighbor_doc') step_cost = travel_time if travel_time is not None else (distance_km or 0) new_cost = cost + step_cost state_key = (neighbor_key, next_phase) if state_key in visited and new_cost >= visited[state_key]: continue visited[state_key] = new_cost counter += 1 heapq.heappush(queue, (new_cost, counter, neighbor_key, next_phase)) predecessors[state_key] = ((node_key, phase), neighbor) if not found_routes: logger.info("No product routes found for %s -> %s", product_uuid, to_uuid) return [] return found_routes schema = graphene.Schema(query=Query) # Helper methods attached to Query for route assembly def _build_stage(from_doc, to_doc, transport_type, edges): distance_km = sum(e.get('distance_km') or 0 for e in edges) travel_time = sum(e.get('travel_time_seconds') or 0 for e in edges) return RouteStageType( from_uuid=from_doc.get('_key') if from_doc else None, from_name=from_doc.get('name') if from_doc else None, from_lat=from_doc.get('latitude') if from_doc else None, from_lon=from_doc.get('longitude') if from_doc else None, to_uuid=to_doc.get('_key') if to_doc else None, to_name=to_doc.get('name') if to_doc else None, to_lat=to_doc.get('latitude') if to_doc else None, to_lon=to_doc.get('longitude') if to_doc else None, distance_km=distance_km, travel_time_seconds=travel_time, transport_type=transport_type, ) def _build_route_from_edges(path_edges, node_docs): """Собрать RoutePathType из списка ребёр (source->dest), схлопывая типы.""" if not path_edges: return None stages = [] current_edges = [] current_type = None segment_start = None for from_key, to_key, edge in path_edges: edge_type = edge.get('transport_type') if current_type is None: current_type = edge_type current_edges = [edge] segment_start = from_key elif edge_type == current_type: current_edges.append(edge) else: # закрываем предыдущий сегмент stages.append(_build_stage( node_docs.get(segment_start), node_docs.get(from_key), current_type, current_edges, )) current_type = edge_type current_edges = [edge] segment_start = from_key # Последний сгусток last_to = path_edges[-1][1] stages.append(_build_stage( node_docs.get(segment_start), node_docs.get(last_to), current_type, current_edges, )) total_distance = sum(s.distance_km or 0 for s in stages) total_time = sum(s.travel_time_seconds or 0 for s in stages) return RoutePathType( total_distance_km=total_distance, total_time_seconds=total_time, stages=stages, ) # Bind helpers to class for access in resolver Query._build_route_from_edges = _build_route_from_edges def _distance_km(lat1, lon1, lat2, lon2): """Haversine distance in km.""" r = 6371 d_lat = math.radians(lat2 - lat1) d_lon = math.radians(lon2 - lon1) a = ( math.sin(d_lat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lon / 2) ** 2 ) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return r * c Query._distance_km = _distance_km