Compare commits

...

4 Commits

Author SHA1 Message Date
Ruslan Bakiev
569924cabb Seed from HS CSV and require real data
All checks were successful
Build Docker Image / build (push) Successful in 2m58s
2026-02-05 18:42:55 +07:00
Ruslan Bakiev
e8f11116a2 Enforce 1s delay in exchange seed 2026-02-05 15:01:13 +07:00
Ruslan Bakiev
4a8e69f3c3 Use GEO_INTERNAL_URL/EXTERNAL_URL for hubs 2026-02-05 14:51:54 +07:00
Ruslan Bakiev
0f857192d4 Use GEO_INTERNAL_URL for exchange seed 2026-02-05 14:51:05 +07:00

View File

@@ -2,13 +2,16 @@
Seed Suppliers and Offers for African cocoa belt.
Creates offers via Temporal workflow so they sync to the graph.
"""
import csv
import os
import random
import uuid
from pathlib import Path
from decimal import Decimal
import time
import requests
from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from offers.models import Offer
@@ -65,19 +68,11 @@ SUPPLIER_NAMES = [
"Dar Coast Commodities", "Maputo Export House",
]
# Fixed product catalog (10 items) with realistic prices per ton (USD)
PRODUCT_CATALOG = [
{"name": "Cocoa Beans", "category": "Cocoa", "price": Decimal("2450.00")},
{"name": "Shea Butter", "category": "Oils & Fats", "price": Decimal("1800.00")},
{"name": "Cashew Nuts", "category": "Nuts", "price": Decimal("5200.00")},
{"name": "Palm Oil", "category": "Oils & Fats", "price": Decimal("980.00")},
{"name": "Coffee Beans", "category": "Coffee", "price": Decimal("3800.00")},
{"name": "Sesame Seeds", "category": "Seeds", "price": Decimal("2100.00")},
{"name": "Cotton", "category": "Fiber", "price": Decimal("1650.00")},
{"name": "Maize", "category": "Grains", "price": Decimal("260.00")},
{"name": "Sorghum", "category": "Grains", "price": Decimal("230.00")},
{"name": "Natural Rubber", "category": "Industrial", "price": Decimal("1750.00")},
]
# Default GLEIF Africa LEI dataset path (repo-local)
DEFAULT_GLEIF_PATH = "datasets/gleif/africa_lei_companies.csv"
# Default HS product mapping CSV (repo-local)
DEFAULT_HS_PRODUCTS_PATH = "datasets/hs/exchange_seed_product_hs_map.csv"
class Command(BaseCommand):
@@ -102,6 +97,12 @@ class Command(BaseCommand):
default=10,
help="How many distinct products to use (default: 10)",
)
parser.add_argument(
"--product-csv",
type=str,
default="",
help="Path to HS product CSV (defaults to datasets/hs/exchange_seed_product_hs_map.csv)",
)
parser.add_argument(
"--supplier-location-ratio",
type=float,
@@ -138,8 +139,8 @@ class Command(BaseCommand):
parser.add_argument(
"--geo-url",
type=str,
default="http://geo:8000/graphql/public/",
help="Geo service GraphQL URL (default: http://geo:8000/graphql/public/)",
default=None,
help="Geo service GraphQL URL (defaults to GEO_INTERNAL_URL env var)",
)
parser.add_argument(
"--odoo-url",
@@ -176,6 +177,12 @@ class Command(BaseCommand):
default=None,
help="Filter offers by product name (e.g., 'Cocoa Beans')",
)
parser.add_argument(
"--company-csv",
type=str,
default=None,
help="Path to CSV with real company names (default: datasets/gleif/africa_lei_companies.csv)",
)
def handle(self, *args, **options):
if options["clear"]:
@@ -193,26 +200,25 @@ class Command(BaseCommand):
use_workflow = not options["no_workflow"]
use_bulk = options["bulk"]
bulk_size = max(1, options["bulk_size"])
sleep_ms = max(0, options["sleep_ms"])
geo_url = options["geo_url"]
odoo_url = options["odoo_url"]
# Enforce fixed 1s delay to protect infra regardless of CLI flags
sleep_ms = 1000
geo_url = (
options["geo_url"]
or os.getenv("GEO_INTERNAL_URL")
or os.getenv("GEO_EXTERNAL_URL")
or os.getenv("GEO_URL")
)
if not geo_url:
self.stdout.write(self.style.ERROR("Geo URL is not set. Provide --geo-url or GEO_INTERNAL_URL."))
return
geo_url = self._normalize_geo_url(geo_url)
product_filter = options["product"]
ensure_products = options["ensure_products"]
odoo_db = options["odoo_db"]
odoo_user = options["odoo_user"]
odoo_password = options["odoo_password"]
product_csv = options["product_csv"]
company_csv = options["company_csv"]
# Fetch products from Odoo
self.stdout.write("Fetching products from Odoo...")
products = self._fetch_products_from_odoo(odoo_url, odoo_db, odoo_user, odoo_password)
if ensure_products:
self.stdout.write("Ensuring product catalog exists in Odoo...")
products = self._ensure_products_in_odoo(
odoo_url, odoo_db, odoo_user, odoo_password, products
)
if not products:
self.stdout.write(self.style.WARNING("No products found in Odoo. Falling back to catalog only."))
products = self._catalog_products()
# Load products from HS CSV
self.stdout.write("Loading products from HS CSV...")
products = self._load_product_pool(product_csv)
self.stdout.write(f"Found {len(products)} products")
# Filter by product name if specified
@@ -233,14 +239,13 @@ class Command(BaseCommand):
hubs = self._fetch_african_hubs(geo_url)
if not hubs:
self.stdout.write(self.style.WARNING(
"No African hubs found. Using default locations."
))
hubs = self._default_african_hubs()
self.stdout.write(self.style.ERROR("No African hubs found from geo service. Aborting seed."))
return
self.stdout.write(f"Found {len(hubs)} African hubs")
# Create suppliers
self._company_pool = self._load_company_pool(company_csv)
self.stdout.write(f"Creating {suppliers_count} suppliers...")
new_suppliers = self._create_suppliers(suppliers_count, hubs)
self.stdout.write(self.style.SUCCESS(f"Created {len(new_suppliers)} suppliers"))
@@ -264,8 +269,50 @@ class Command(BaseCommand):
)
self.stdout.write(self.style.SUCCESS(f"Created {len(created_offers)} offers"))
def _catalog_products(self) -> list:
return [(p["name"], p["category"], str(uuid.uuid4()), p["price"]) for p in PRODUCT_CATALOG]
def _find_default_product_csv(self) -> str | None:
"""Locate default HS product CSV in repo (datasets/hs/exchange_seed_product_hs_map.csv)."""
here = Path(__file__).resolve()
for parent in here.parents:
candidate = parent / DEFAULT_HS_PRODUCTS_PATH
if candidate.exists():
return str(candidate)
return None
def _load_product_pool(self, csv_path: str | None) -> list[tuple]:
"""Load real product names from HS CSV; returns list of tuples."""
path = csv_path or self._find_default_product_csv()
if not path or not os.path.exists(path):
raise CommandError(
"HS product CSV not found. Seed requires real product data; "
"ensure datasets/hs/exchange_seed_product_hs_map.csv is available."
)
products: list[tuple] = []
seen = set()
try:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
name = (row.get("product") or "").strip()
hs6 = (row.get("hs6") or "").strip()
label = (row.get("label") or "").strip()
if not name or not hs6:
continue
key = (name, hs6)
if key in seen:
continue
seen.add(key)
product_uuid = self._stable_uuid("hs6", hs6)
category = label or name
products.append((name, category, product_uuid, Decimal("1000.00")))
except Exception as e:
raise CommandError(f"Failed to read HS product CSV: {e}")
if not products:
raise CommandError("HS product CSV is empty. Seed requires real product data.")
random.shuffle(products)
return products
def _fetch_products_from_odoo(self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str) -> list:
"""Fetch products from Odoo via JSON-RPC"""
@@ -572,15 +619,28 @@ class Command(BaseCommand):
lat += random.uniform(-0.5, 0.5)
lng += random.uniform(-0.5, 0.5)
name = self._generate_supplier_name(idx)
company = self._pick_company(idx)
if company:
name = company["name"]
company_code = company.get("country_code")
mapped_name = self._country_name_from_code(company_code)
if mapped_name:
country = mapped_name
country_code = company_code
supplier_uuid = self._stable_uuid("supplier", company.get("lei") or name)
team_uuid = self._stable_uuid("team", company.get("lei") or name)
else:
name = self._generate_supplier_name(idx)
supplier_uuid = str(uuid.uuid4())
team_uuid = str(uuid.uuid4())
description = (
f"{name} is a reliable supplier based in {country}, "
"focused on consistent quality and transparent logistics."
)
profile = SupplierProfile.objects.create(
uuid=str(uuid.uuid4()),
team_uuid=str(uuid.uuid4()),
uuid=supplier_uuid,
team_uuid=team_uuid,
name=name,
description=description,
country=country,
@@ -595,10 +655,79 @@ class Command(BaseCommand):
return created
def _generate_supplier_name(self, index: int) -> str:
"""Pick a realistic supplier name; fall back if list is exhausted."""
if index < len(SUPPLIER_NAMES):
return SUPPLIER_NAMES[index]
return f"{random.choice(SUPPLIER_NAMES)} Group"
raise CommandError("Supplier name fallback is disabled. Provide real company CSV.")
def _find_default_company_csv(self) -> str | None:
"""Locate default company CSV in repo (datasets/gleif/africa_lei_companies.csv)."""
here = Path(__file__).resolve()
for parent in here.parents:
candidate = parent / DEFAULT_GLEIF_PATH
if candidate.exists():
return str(candidate)
return None
def _load_company_pool(self, csv_path: str | None) -> list[dict]:
"""Load real company names from CSV; returns list of dicts."""
path = csv_path or self._find_default_company_csv()
if not path or not os.path.exists(path):
raise CommandError(
"Company CSV not found. Seed requires real company names; "
"ensure datasets/gleif/africa_lei_companies.csv is available."
)
companies = []
seen = set()
try:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
name = (row.get("entity_name") or "").strip()
if not name:
continue
if name in seen:
continue
seen.add(name)
companies.append(
{
"name": name,
"lei": (row.get("lei") or "").strip(),
"country_code": (row.get("legal_address_country") or row.get("headquarters_country") or "").strip(),
"city": (row.get("legal_address_city") or row.get("headquarters_city") or "").strip(),
}
)
except Exception as e:
raise CommandError(f"Failed to read company CSV: {e}")
random.shuffle(companies)
self.stdout.write(f"Loaded {len(companies)} company names from CSV")
return companies
def _pick_company(self, index: int) -> dict | None:
if not getattr(self, "_company_pool", None):
raise CommandError("Company pool is empty. Seed requires real company CSV.")
if index < len(self._company_pool):
return self._company_pool[index]
return random.choice(self._company_pool)
def _stable_uuid(self, prefix: str, value: str) -> str:
return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{prefix}:{value}"))
def _country_name_from_code(self, code: str | None) -> str | None:
if not code:
return None
for name, country_code, _, _ in AFRICAN_COUNTRIES:
if country_code == code:
return name
return None
def _normalize_geo_url(self, url: str) -> str:
"""Ensure geo URL has scheme and GraphQL path."""
value = url.strip()
if not value.startswith(("http://", "https://")):
value = f"http://{value}"
if "/graphql" not in value:
value = value.rstrip("/") + "/graphql/public/"
return value
def _price_for_product(self, product_name: str) -> Decimal:
for item in PRODUCT_CATALOG: