Initial commit from monorepo

This commit is contained in:
Ruslan Bakiev
2026-01-07 09:17:51 +07:00
commit eb2ffd50f2
13 changed files with 1190 additions and 0 deletions

1
geo_app/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Geo app - logistics graph operations."""

6
geo_app/apps.py Normal file
View File

@@ -0,0 +1,6 @@
from django.apps import AppConfig
class GeoAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'geo_app'

49
geo_app/arango_client.py Normal file
View File

@@ -0,0 +1,49 @@
"""ArangoDB client singleton."""
import logging
from arango import ArangoClient
from django.conf import settings
logger = logging.getLogger(__name__)
_db = None
def get_db():
"""Get ArangoDB database connection (singleton)."""
global _db
if _db is None:
hosts = settings.ARANGODB_INTERNAL_URL
if not hosts.startswith("http"):
hosts = f"http://{hosts}"
client = ArangoClient(hosts=hosts)
_db = client.db(
settings.ARANGODB_DATABASE,
username='root',
password=settings.ARANGODB_PASSWORD,
)
logger.info(
"Connected to ArangoDB: %s/%s",
hosts,
settings.ARANGODB_DATABASE,
)
return _db
def ensure_graph():
"""Ensure named graph exists for K_SHORTEST_PATHS queries."""
db = get_db()
graph_name = 'optovia_graph'
if db.has_graph(graph_name):
return db.graph(graph_name)
logger.info("Creating graph: %s", graph_name)
return db.create_graph(
graph_name,
edge_definitions=[{
'edge_collection': 'edges',
'from_vertex_collections': ['nodes'],
'to_vertex_collections': ['nodes'],
}],
)

763
geo_app/schema.py Normal file
View File

@@ -0,0 +1,763 @@
"""GraphQL schema for Geo service."""
import logging
import heapq
import math
import requests
import graphene
from django.conf import settings
from .arango_client import get_db, ensure_graph
logger = logging.getLogger(__name__)
class EdgeType(graphene.ObjectType):
"""Edge between two nodes (route)."""
to_uuid = graphene.String()
to_name = graphene.String()
to_latitude = graphene.Float()
to_longitude = graphene.Float()
distance_km = graphene.Float()
travel_time_seconds = graphene.Int()
transport_type = graphene.String()
class NodeType(graphene.ObjectType):
"""Logistics node with edges to neighbors."""
uuid = graphene.String()
name = graphene.String()
latitude = graphene.Float()
longitude = graphene.Float()
country = graphene.String()
country_code = graphene.String()
synced_at = graphene.String()
transport_types = graphene.List(graphene.String)
edges = graphene.List(EdgeType)
class NodeConnectionsType(graphene.ObjectType):
"""Auto + rail edges for a node, rail uses nearest rail node."""
hub = graphene.Field(NodeType)
rail_node = graphene.Field(NodeType)
auto_edges = graphene.List(EdgeType)
rail_edges = graphene.List(EdgeType)
class RouteType(graphene.ObjectType):
"""Route between two points with geometry."""
distance_km = graphene.Float()
geometry = graphene.JSONString(description="GeoJSON LineString coordinates")
class RouteStageType(graphene.ObjectType):
"""Single stage in a multi-hop route."""
from_uuid = graphene.String()
from_name = graphene.String()
from_lat = graphene.Float()
from_lon = graphene.Float()
to_uuid = graphene.String()
to_name = graphene.String()
to_lat = graphene.Float()
to_lon = graphene.Float()
distance_km = graphene.Float()
travel_time_seconds = graphene.Int()
transport_type = graphene.String()
class RoutePathType(graphene.ObjectType):
"""Complete route through graph with multiple stages."""
total_distance_km = graphene.Float()
total_time_seconds = graphene.Int()
stages = graphene.List(RouteStageType)
class ProductRouteOptionType(graphene.ObjectType):
"""Route options for a product source to the destination."""
source_uuid = graphene.String()
source_name = graphene.String()
source_lat = graphene.Float()
source_lon = graphene.Float()
distance_km = graphene.Float()
routes = graphene.List(RoutePathType)
class Query(graphene.ObjectType):
"""Root query."""
MAX_EXPANSIONS = 20000
node = graphene.Field(
NodeType,
uuid=graphene.String(required=True),
description="Get node by UUID with all edges to neighbors",
)
nodes = graphene.List(
NodeType,
description="Get all nodes (without edges for performance)",
limit=graphene.Int(),
offset=graphene.Int(),
transport_type=graphene.String(),
)
nodes_count = graphene.Int(
transport_type=graphene.String(),
description="Get total count of nodes (with optional transport filter)",
)
node_connections = graphene.Field(
NodeConnectionsType,
uuid=graphene.String(required=True),
limit_auto=graphene.Int(default_value=12),
limit_rail=graphene.Int(default_value=12),
description="Get auto + rail edges for a node (rail uses nearest rail node)",
)
auto_route = graphene.Field(
RouteType,
from_lat=graphene.Float(required=True),
from_lon=graphene.Float(required=True),
to_lat=graphene.Float(required=True),
to_lon=graphene.Float(required=True),
description="Get auto route between two points via GraphHopper",
)
rail_route = graphene.Field(
RouteType,
from_lat=graphene.Float(required=True),
from_lon=graphene.Float(required=True),
to_lat=graphene.Float(required=True),
to_lon=graphene.Float(required=True),
description="Get rail route between two points via OpenRailRouting",
)
find_routes = graphene.List(
RoutePathType,
from_uuid=graphene.String(required=True),
to_uuid=graphene.String(required=True),
limit=graphene.Int(default_value=3),
description="Find K shortest routes through graph between two nodes",
)
find_product_routes = graphene.List(
ProductRouteOptionType,
product_uuid=graphene.String(required=True),
to_uuid=graphene.String(required=True),
limit_sources=graphene.Int(default_value=3),
limit_routes=graphene.Int(default_value=3),
description="Find routes from product offer nodes to destination",
)
@staticmethod
def _build_routes(db, from_uuid, to_uuid, limit):
"""Shared helper to compute K shortest routes between two nodes."""
aql = """
FOR path IN ANY K_SHORTEST_PATHS
@from_vertex TO @to_vertex
GRAPH 'optovia_graph'
OPTIONS { weightAttribute: 'distance_km' }
LIMIT @limit
RETURN {
vertices: path.vertices,
edges: path.edges,
weight: path.weight
}
"""
try:
cursor = db.aql.execute(
aql,
bind_vars={
'from_vertex': f'nodes/{from_uuid}',
'to_vertex': f'nodes/{to_uuid}',
'limit': limit,
},
)
paths = list(cursor)
except Exception as e:
logger.error("K_SHORTEST_PATHS query failed: %s", e)
return []
if not paths:
logger.info("No paths found from %s to %s", from_uuid, to_uuid)
return []
routes = []
for path in paths:
vertices = path.get('vertices', [])
edges = path.get('edges', [])
# Build vertex lookup by _id
vertex_by_id = {v['_id']: v for v in vertices}
stages = []
for edge in edges:
from_node = vertex_by_id.get(edge['_from'], {})
to_node = vertex_by_id.get(edge['_to'], {})
stages.append(RouteStageType(
from_uuid=from_node.get('_key'),
from_name=from_node.get('name'),
from_lat=from_node.get('latitude'),
from_lon=from_node.get('longitude'),
to_uuid=to_node.get('_key'),
to_name=to_node.get('name'),
to_lat=to_node.get('latitude'),
to_lon=to_node.get('longitude'),
distance_km=edge.get('distance_km'),
travel_time_seconds=edge.get('travel_time_seconds'),
transport_type=edge.get('transport_type'),
))
total_time = sum(s.travel_time_seconds or 0 for s in stages)
routes.append(RoutePathType(
total_distance_km=path.get('weight'),
total_time_seconds=total_time,
stages=stages,
))
return routes
def resolve_node(self, info, uuid):
"""
Get a single node with all its outgoing edges.
Returns node info + list of edges to neighbors with distances.
"""
db = get_db()
# Get node
nodes_col = db.collection('nodes')
node = nodes_col.get(uuid)
if not node:
return None
# Get all outgoing edges from this node
edges_col = db.collection('edges')
aql = """
FOR edge IN edges
FILTER edge._from == @from_id
LET to_node = DOCUMENT(edge._to)
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
"""
cursor = db.aql.execute(aql, bind_vars={'from_id': f"nodes/{uuid}"})
edges = list(cursor)
logger.info("Node %s has %d edges", uuid, len(edges))
return NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[EdgeType(**e) for e in edges],
)
def resolve_nodes(self, info, limit=None, offset=None, transport_type=None):
"""Get all logistics nodes (without edges for list view)."""
db = get_db()
# Only return logistics nodes (not buyer/seller addresses)
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
SORT node.name ASC
LIMIT @offset, @limit
RETURN node
"""
cursor = db.aql.execute(
aql,
bind_vars={
'transport_type': transport_type,
'offset': 0 if offset is None else offset,
'limit': 1000000 if limit is None else limit,
},
)
nodes = []
for node in cursor:
nodes.append(NodeType(
uuid=node['_key'],
name=node.get('name'),
latitude=node.get('latitude'),
longitude=node.get('longitude'),
country=node.get('country'),
country_code=node.get('country_code'),
synced_at=node.get('synced_at'),
transport_types=node.get('transport_types') or [],
edges=[], # Don't load edges for list
))
logger.info("Returning %d nodes", len(nodes))
return nodes
def resolve_nodes_count(self, info, transport_type=None):
db = get_db()
aql = """
FOR node IN nodes
FILTER node.node_type == 'logistics' OR node.node_type == null
LET types = node.transport_types != null ? node.transport_types : []
FILTER @transport_type == null OR @transport_type IN types
COLLECT WITH COUNT INTO length
RETURN length
"""
cursor = db.aql.execute(aql, bind_vars={'transport_type': transport_type})
return next(cursor, 0)
def resolve_node_connections(self, info, uuid, limit_auto=12, limit_rail=12):
"""Get auto edges from hub and rail edges from nearest rail node."""
db = get_db()
nodes_col = db.collection('nodes')
hub = nodes_col.get(uuid)
if not hub:
return None
aql = """
LET auto_edges = (
FOR edge IN edges
FILTER edge._from == @from_id AND edge.transport_type == "auto"
LET to_node = DOCUMENT(edge._to)
FILTER to_node != null
SORT edge.distance_km ASC
LIMIT @limit_auto
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
)
LET hub_has_rail = @hub_has_rail
LET rail_node = hub_has_rail ? DOCUMENT(@from_id) : FIRST(
FOR node IN nodes
FILTER node.latitude != null AND node.longitude != null
FILTER 'rail' IN node.transport_types
SORT DISTANCE(@hub_lat, @hub_lon, node.latitude, node.longitude)
LIMIT 1
RETURN node
)
LET rail_edges = rail_node == null ? [] : (
FOR edge IN edges
FILTER edge._from == CONCAT("nodes/", rail_node._key) AND edge.transport_type == "rail"
LET to_node = DOCUMENT(edge._to)
FILTER to_node != null
SORT edge.distance_km ASC
LIMIT @limit_rail
RETURN {
to_uuid: to_node._key,
to_name: to_node.name,
to_latitude: to_node.latitude,
to_longitude: to_node.longitude,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds,
transport_type: edge.transport_type
}
)
RETURN {
hub: DOCUMENT(@from_id),
rail_node: rail_node,
auto_edges: auto_edges,
rail_edges: rail_edges
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'from_id': f"nodes/{uuid}",
'hub_lat': hub.get('latitude'),
'hub_lon': hub.get('longitude'),
'hub_has_rail': 'rail' in (hub.get('transport_types') or []),
'limit_auto': limit_auto,
'limit_rail': limit_rail,
},
)
result = next(cursor, None)
if not result:
return None
def build_node(doc):
if not doc:
return None
return NodeType(
uuid=doc['_key'],
name=doc.get('name'),
latitude=doc.get('latitude'),
longitude=doc.get('longitude'),
country=doc.get('country'),
country_code=doc.get('country_code'),
synced_at=doc.get('synced_at'),
transport_types=doc.get('transport_types') or [],
edges=[],
)
return NodeConnectionsType(
hub=build_node(result.get('hub')),
rail_node=build_node(result.get('rail_node')),
auto_edges=[EdgeType(**e) for e in result.get('auto_edges') or []],
rail_edges=[EdgeType(**e) for e in result.get('rail_edges') or []],
)
def resolve_auto_route(self, info, from_lat, from_lon, to_lat, to_lon):
"""Get auto route via GraphHopper."""
url = f"{settings.GRAPHHOPPER_EXTERNAL_URL}/route"
params = {
'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"],
'profile': 'car',
'instructions': 'false',
'calc_points': 'true',
'points_encoded': 'false',
}
try:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if 'paths' in data and len(data['paths']) > 0:
path = data['paths'][0]
distance_km = round(path.get('distance', 0) / 1000, 2)
points = path.get('points', {})
coordinates = points.get('coordinates', [])
return RouteType(
distance_km=distance_km,
geometry=coordinates,
)
except requests.RequestException as e:
logger.error("GraphHopper request failed: %s", e)
return None
def resolve_rail_route(self, info, from_lat, from_lon, to_lat, to_lon):
"""Get rail route via OpenRailRouting."""
url = f"{settings.OPENRAILROUTING_EXTERNAL_URL}/route"
params = {
'point': [f"{from_lat},{from_lon}", f"{to_lat},{to_lon}"],
'profile': 'all_tracks',
'calc_points': 'true',
'points_encoded': 'false',
}
try:
response = requests.get(url, params=params, timeout=60)
response.raise_for_status()
data = response.json()
if 'paths' in data and len(data['paths']) > 0:
path = data['paths'][0]
distance_km = round(path.get('distance', 0) / 1000, 2)
points = path.get('points', {})
coordinates = points.get('coordinates', [])
return RouteType(
distance_km=distance_km,
geometry=coordinates,
)
except requests.RequestException as e:
logger.error("OpenRailRouting request failed: %s", e)
return None
def resolve_find_routes(self, info, from_uuid, to_uuid, limit=3):
"""Find K shortest routes through graph using ArangoDB K_SHORTEST_PATHS."""
db = get_db()
ensure_graph()
return Query._build_routes(db, from_uuid, to_uuid, limit)
def resolve_find_product_routes(self, info, product_uuid, to_uuid, limit_sources=3, limit_routes=3):
"""
Найти до N ближайших офферов и вернуть по одному маршруту:
авто -> (rail сколько угодно) -> авто. Поиск идёт от точки назначения наружу.
"""
db = get_db()
ensure_graph() # graph exists, но используем ручной обход
# Load destination node for distance sorting
nodes_col = db.collection('nodes')
dest = nodes_col.get(to_uuid)
if not dest:
logger.info("Destination node %s not found", to_uuid)
return []
dest_lat = dest.get('latitude')
dest_lon = dest.get('longitude')
if dest_lat is None or dest_lon is None:
logger.info("Destination node %s missing coordinates", to_uuid)
return []
max_sources = limit_sources or 5
max_routes = 1 # всегда один маршрут на оффер
# Helpers
def allowed_next_phase(current_phase, transport_type):
"""
Phases — расширение радиуса поиска, ЖД не обязателен:
- end_auto: можно 1 авто, rail, или сразу offer
- end_auto_done: авто использовано — rail или offer
- rail: любое кол-во rail, потом 1 авто или offer
- start_auto_done: авто использовано — только offer
Offer можно найти на любом этапе!
"""
if current_phase == 'end_auto':
if transport_type == 'offer':
return 'offer' # нашли сразу рядом
if transport_type == 'auto':
return 'end_auto_done'
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'end_auto_done':
if transport_type == 'offer':
return 'offer' # нашли после 1 авто
if transport_type == 'rail':
return 'rail'
return None
if current_phase == 'rail':
if transport_type == 'offer':
return 'offer' # нашли на ЖД станции
if transport_type == 'rail':
return 'rail'
if transport_type == 'auto':
return 'start_auto_done'
return None
if current_phase == 'start_auto_done':
if transport_type == 'offer':
return 'offer'
return None
return None
def fetch_neighbors(node_key, phase):
"""Получить соседей с учётом допустимых типов транспорта."""
# offer доступен на всех фазах — ищем ближайший
if phase == 'end_auto':
types = ['auto', 'rail', 'offer']
elif phase == 'end_auto_done':
types = ['rail', 'offer']
elif phase == 'rail':
types = ['rail', 'auto', 'offer']
elif phase == 'start_auto_done':
types = ['offer']
else:
types = ['offer']
aql = """
FOR edge IN edges
FILTER edge.transport_type IN @types
FILTER edge._from == @node_id OR edge._to == @node_id
LET neighbor_id = edge._from == @node_id ? edge._to : edge._from
LET neighbor = DOCUMENT(neighbor_id)
FILTER neighbor != null
RETURN {
neighbor_key: neighbor._key,
neighbor_doc: neighbor,
from_id: edge._from,
to_id: edge._to,
transport_type: edge.transport_type,
distance_km: edge.distance_km,
travel_time_seconds: edge.travel_time_seconds
}
"""
cursor = db.aql.execute(
aql,
bind_vars={
'node_id': f"nodes/{node_key}",
'types': types,
},
)
return list(cursor)
# Priority queue: (cost, seq, node_key, phase)
queue = []
counter = 0
heapq.heappush(queue, (0, counter, to_uuid, 'end_auto'))
visited = {} # (node, phase) -> best_cost
predecessors = {} # (node, phase) -> (prev_node, prev_phase, edge_info)
node_docs = {to_uuid: dest}
found_routes = []
expansions = 0
while queue and len(found_routes) < max_sources and expansions < Query.MAX_EXPANSIONS:
cost, _, node_key, phase = heapq.heappop(queue)
if (node_key, phase) in visited and cost > visited[(node_key, phase)]:
continue
# Если нашли оффер нужного товара в допустимой фазе, фиксируем маршрут
node_doc = node_docs.get(node_key)
if node_doc and node_doc.get('product_uuid') == product_uuid:
path_edges = []
state = (node_key, phase)
current_key = node_key
while state in predecessors:
prev_state, edge_info = predecessors[state]
prev_key = prev_state[0]
path_edges.append((current_key, prev_key, edge_info)) # from source toward dest
state = prev_state
current_key = prev_key
route = _build_route_from_edges(path_edges, node_docs)
distance_km = None
src_lat = node_doc.get('latitude')
src_lon = node_doc.get('longitude')
if src_lat is not None and src_lon is not None:
distance_km = _distance_km(src_lat, src_lon, dest_lat, dest_lon)
found_routes.append(ProductRouteOptionType(
source_uuid=node_key,
source_name=node_doc.get('name'),
source_lat=node_doc.get('latitude'),
source_lon=node_doc.get('longitude'),
distance_km=distance_km,
routes=[route] if route else [],
))
# продолжаем искать остальных
continue
neighbors = fetch_neighbors(node_key, phase)
expansions += 1
for neighbor in neighbors:
transport_type = neighbor.get('transport_type')
next_phase = allowed_next_phase(phase, transport_type)
if next_phase is None:
continue
travel_time = neighbor.get('travel_time_seconds')
distance_km = neighbor.get('distance_km')
neighbor_key = neighbor.get('neighbor_key')
node_docs[neighbor_key] = neighbor.get('neighbor_doc')
step_cost = travel_time if travel_time is not None else (distance_km or 0)
new_cost = cost + step_cost
state_key = (neighbor_key, next_phase)
if state_key in visited and new_cost >= visited[state_key]:
continue
visited[state_key] = new_cost
counter += 1
heapq.heappush(queue, (new_cost, counter, neighbor_key, next_phase))
predecessors[state_key] = ((node_key, phase), neighbor)
if not found_routes:
logger.info("No product routes found for %s -> %s", product_uuid, to_uuid)
return []
return found_routes
schema = graphene.Schema(query=Query)
# Helper methods attached to Query for route assembly
def _build_stage(from_doc, to_doc, transport_type, edges):
distance_km = sum(e.get('distance_km') or 0 for e in edges)
travel_time = sum(e.get('travel_time_seconds') or 0 for e in edges)
return RouteStageType(
from_uuid=from_doc.get('_key') if from_doc else None,
from_name=from_doc.get('name') if from_doc else None,
from_lat=from_doc.get('latitude') if from_doc else None,
from_lon=from_doc.get('longitude') if from_doc else None,
to_uuid=to_doc.get('_key') if to_doc else None,
to_name=to_doc.get('name') if to_doc else None,
to_lat=to_doc.get('latitude') if to_doc else None,
to_lon=to_doc.get('longitude') if to_doc else None,
distance_km=distance_km,
travel_time_seconds=travel_time,
transport_type=transport_type,
)
def _build_route_from_edges(path_edges, node_docs):
"""Собрать RoutePathType из списка ребёр (source->dest), схлопывая типы."""
if not path_edges:
return None
stages = []
current_edges = []
current_type = None
segment_start = None
for from_key, to_key, edge in path_edges:
edge_type = edge.get('transport_type')
if current_type is None:
current_type = edge_type
current_edges = [edge]
segment_start = from_key
elif edge_type == current_type:
current_edges.append(edge)
else:
# закрываем предыдущий сегмент
stages.append(_build_stage(
node_docs.get(segment_start),
node_docs.get(from_key),
current_type,
current_edges,
))
current_type = edge_type
current_edges = [edge]
segment_start = from_key
# Последний сгусток
last_to = path_edges[-1][1]
stages.append(_build_stage(
node_docs.get(segment_start),
node_docs.get(last_to),
current_type,
current_edges,
))
total_distance = sum(s.distance_km or 0 for s in stages)
total_time = sum(s.travel_time_seconds or 0 for s in stages)
return RoutePathType(
total_distance_km=total_distance,
total_time_seconds=total_time,
stages=stages,
)
# Bind helpers to class for access in resolver
Query._build_route_from_edges = _build_route_from_edges
def _distance_km(lat1, lon1, lat2, lon2):
"""Haversine distance in km."""
r = 6371
d_lat = math.radians(lat2 - lat1)
d_lon = math.radians(lon2 - lon1)
a = (
math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(d_lon / 2) ** 2
)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return r * c
Query._distance_km = _distance_km