Files
optovia/lang/agent/graph.py
2026-01-07 09:20:11 +07:00

108 lines
3.0 KiB
Python

import os
from typing import Any, Dict
from langchain.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import SystemMessage
from langchain_core.runnables import RunnableConfig
from mcp_client import McpInvoker
# Configure LLM (Gemini)
def build_llm() -> ChatGoogleGenerativeAI:
return ChatGoogleGenerativeAI(
model=os.getenv("GOOGLE_VERTEX_MODEL", "gemini-1.5-pro"),
temperature=float(os.getenv("LLM_TEMPERATURE", "0.3")),
)
def build_mcp() -> McpInvoker:
return McpInvoker(
command=os.getenv("MCP_COMMAND", "node"),
args=os.getenv("MCP_ARGS", "lang/mcp/src/index.js").split(),
)
def build_tools(mcp: McpInvoker):
@tool
def match_offers_with_route(
product_uuid: str,
destination_uuid: str,
limit_sources: int = 3,
limit_routes: int = 3,
user_token: str | None = None,
) -> Any:
"""Find active offers for a product and return route options to destination node."""
return mcp.call(
"match_offers_with_route",
{
"productUuid": product_uuid,
"destinationUuid": destination_uuid,
"limitSources": limit_sources,
"limitRoutes": limit_routes,
"userToken": user_token,
},
)
@tool
def order_timeline(order_uuid: str, user_token: str) -> Any:
"""Get order with stages/trips for a team. Requires user/team token."""
return mcp.call(
"order_timeline",
{
"orderUuid": order_uuid,
"userToken": user_token,
},
)
@tool
def search_nodes(
transport_type: str | None = None,
limit: int = 20,
offset: int = 0,
user_token: str | None = None,
) -> Any:
"""Search logistics nodes with optional transport filter."""
return mcp.call(
"search_nodes",
{
"transportType": transport_type,
"limit": limit,
"offset": offset,
"userToken": user_token,
},
)
return [match_offers_with_route, order_timeline, search_nodes]
def build_graph():
llm = build_llm()
mcp = build_mcp()
tools = build_tools(mcp)
system = SystemMessage(
content=(
"You are Optovia logistics agent. Prefer calling tools to answer. "
"Use match_offers_with_route for RFQ-style questions, order_timeline for shipment status, "
"and search_nodes to explore logistics locations. Keep answers concise."
)
)
app = create_react_agent(
llm,
tools,
state_modifier=lambda state: state + [system],
)
return app
graph_app = build_graph()
def invoke_graph(input_data: Dict[str, Any], config: RunnableConfig | None = None):
"""Convenience single-call invoke."""
return graph_app.invoke(input_data, config=config)