Files
exchange/offers/management/commands/seed_exchange.py
Ruslan Bakiev 569924cabb
All checks were successful
Build Docker Image / build (push) Successful in 2m58s
Seed from HS CSV and require real data
2026-02-05 18:42:55 +07:00

914 lines
36 KiB
Python

"""
Seed Suppliers and Offers for African cocoa belt.
Creates offers via Temporal workflow so they sync to the graph.
"""
import csv
import os
import random
import uuid
from pathlib import Path
from decimal import Decimal
import time
import requests
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from offers.models import Offer
from offers.services import OfferService, OfferData
from suppliers.models import SupplierProfile
# African cocoa belt countries
AFRICAN_COUNTRIES = [
("Côte d'Ivoire", "CI", 6.8276, -5.2893), # Abidjan
("Ghana", "GH", 5.6037, -0.1870), # Accra
("Nigeria", "NG", 6.5244, 3.3792), # Lagos
("Cameroon", "CM", 4.0511, 9.7679), # Douala
("Togo", "TG", 6.1725, 1.2314), # Lomé
]
# Realistic supplier names (English, Africa-focused)
SUPPLIER_NAMES = [
"Cocoa Coast Exports", "Golden Savannah Trading", "Abidjan Agro Partners",
"Volta River Commodities", "Lagos Harbor Supply", "Accra Prime Exports",
"Tema Logistics & Trading", "Sahel Harvest Group", "Nile Delta Commodities",
"Gulf of Guinea Traders", "Kumasi Cocoa Collective", "Benin AgroLink",
"Douala Growth Partners", "Westbridge Commodities", "Ivory Gate Exporters",
"Ghana Frontier Trading", "Sunrise Agro Holdings", "Coastal Belt Supply",
"Keta Shore Commodities", "Takoradi Export House", "Mango Bay Trading",
"Savanna Crest Exports", "Sankofa Trade Corp", "Niger Delta Agrimark",
"Lake Volta Produce", "Zou River Exports", "Lomé Port Traders",
"Atlantic Harvest Co", "Forest Belt Commodities", "Côte d'Ivoire Supply",
"Ashanti Agro Trade", "Midland Cocoa Group", "Sahelian Produce Traders",
"Kintampo Agro Partners", "Gold Coast Exporters", "Cashew Ridge Trading",
"Prairie Coast Supply", "Harborline Exports", "Palm Coast Commodities",
"Green Belt Trading", "Westland Agro Link", "Delta Coast Produce",
"Kongo River Exports", "Bight of Benin Supply", "Akwa Ibom Traders",
"Cameroon Highlands Trading", "Coastal Plains Export", "Guinea Gulf Trading",
"Korhogo Agro Supply", "Northern Plains Traders", "Oti River Exports",
"Eastern Coast Commodities", "Sunset Bay Exporters", "Freetown Agro Trade",
"Makola Market Supply", "Afram Plains Trading", "Cedar Coast Commodities",
"Monrovia Export House", "Bissau Agro Partners", "Lac Togo Traders",
"Riverine Agro Link", "Cape Coast Exporters", "Delta Rise Commodities",
"Mali Savanna Trade", "Burkina Harvest Co", "Niger Basin Exports",
"Sierra Green Trading", "Liberia Agro Collective", "Congo Gate Traders",
"Ashanti Heritage Exports", "Ivory Belt Trading", "Sahel Horizon Supply",
"Atlantic Crest Commodities", "Green Valley Export", "Cocoa Ridge Trade",
"Palm Grove Exports", "Keta Delta Trading", "Lagoon Coast Commodities",
"Accra Trade Works", "Tema Export Alliance", "Lagos Trade Link",
"Cape Three Points Exports", "Ivory Coast Agro Hub", "Savanna Trade Network",
"Nile Coast Commodities", "Sahara Edge Trading", "Goldleaf Exports",
"Makeni Agro Partners", "Bamako Produce Traders", "Ouagadougou Exports",
"Conakry Trade House", "Port Harcourt Supply", "Calabar Exporters",
"Abuja Agro Traders", "Eko Commodities", "Gabon Forest Trade",
"Libreville Export Group", "Senegal River Commodities", "Dakar Trade Alliance",
"Kaolack Agro Supply", "Saint-Louis Exporters", "Zanzibar Coast Trading",
"Kilwa Harvest Group", "Lake Victoria Exports", "Mombasa Trade Gate",
"Dar Coast Commodities", "Maputo Export House",
]
# Default GLEIF Africa LEI dataset path (repo-local)
DEFAULT_GLEIF_PATH = "datasets/gleif/africa_lei_companies.csv"
# Default HS product mapping CSV (repo-local)
DEFAULT_HS_PRODUCTS_PATH = "datasets/hs/exchange_seed_product_hs_map.csv"
class Command(BaseCommand):
help = "Seed Suppliers and Offers for African cocoa belt with workflow sync"
def add_arguments(self, parser):
parser.add_argument(
"--suppliers",
type=int,
default=10,
help="How many suppliers to create (default: 10)",
)
parser.add_argument(
"--offers",
type=int,
default=50,
help="How many offers to create (default: 50)",
)
parser.add_argument(
"--product-count",
type=int,
default=10,
help="How many distinct products to use (default: 10)",
)
parser.add_argument(
"--product-csv",
type=str,
default="",
help="Path to HS product CSV (defaults to datasets/hs/exchange_seed_product_hs_map.csv)",
)
parser.add_argument(
"--supplier-location-ratio",
type=float,
default=0.8,
help="Share of offers that use supplier address (default: 0.8)",
)
parser.add_argument(
"--clear",
action="store_true",
help="Delete all existing suppliers and offers before seeding",
)
parser.add_argument(
"--no-workflow",
action="store_true",
help="Create offers directly in DB without workflow (no graph sync)",
)
parser.add_argument(
"--bulk",
action="store_true",
help="Use bulk_create for offers (only with --no-workflow)",
)
parser.add_argument(
"--bulk-size",
type=int,
default=200,
help="Batch size for bulk_create (default: 200)",
)
parser.add_argument(
"--sleep-ms",
type=int,
default=0,
help="Sleep between offer creations in milliseconds (default: 0)",
)
parser.add_argument(
"--geo-url",
type=str,
default=None,
help="Geo service GraphQL URL (defaults to GEO_INTERNAL_URL env var)",
)
parser.add_argument(
"--odoo-url",
type=str,
default="http://odoo:8069",
help="Odoo URL (default: http://odoo:8069)",
)
parser.add_argument(
"--ensure-products",
action="store_true",
help="Ensure product catalog exists in Odoo (create if missing)",
)
parser.add_argument(
"--odoo-db",
type=str,
default="odoo",
help="Odoo database name (default: odoo)",
)
parser.add_argument(
"--odoo-user",
type=int,
default=2,
help="Odoo user id (default: 2)",
)
parser.add_argument(
"--odoo-password",
type=str,
default="admin",
help="Odoo password (default: admin)",
)
parser.add_argument(
"--product",
type=str,
default=None,
help="Filter offers by product name (e.g., 'Cocoa Beans')",
)
parser.add_argument(
"--company-csv",
type=str,
default=None,
help="Path to CSV with real company names (default: datasets/gleif/africa_lei_companies.csv)",
)
def handle(self, *args, **options):
if options["clear"]:
with transaction.atomic():
offers_deleted, _ = Offer.objects.all().delete()
suppliers_deleted, _ = SupplierProfile.objects.all().delete()
self.stdout.write(self.style.WARNING(
f"Deleted {suppliers_deleted} supplier profiles and {offers_deleted} offers"
))
suppliers_count = max(0, options["suppliers"])
offers_count = max(0, options["offers"])
product_count = max(1, options["product_count"])
supplier_location_ratio = min(max(options["supplier_location_ratio"], 0.0), 1.0)
use_workflow = not options["no_workflow"]
use_bulk = options["bulk"]
bulk_size = max(1, options["bulk_size"])
# Enforce fixed 1s delay to protect infra regardless of CLI flags
sleep_ms = 1000
geo_url = (
options["geo_url"]
or os.getenv("GEO_INTERNAL_URL")
or os.getenv("GEO_EXTERNAL_URL")
or os.getenv("GEO_URL")
)
if not geo_url:
self.stdout.write(self.style.ERROR("Geo URL is not set. Provide --geo-url or GEO_INTERNAL_URL."))
return
geo_url = self._normalize_geo_url(geo_url)
product_filter = options["product"]
product_csv = options["product_csv"]
company_csv = options["company_csv"]
# Load products from HS CSV
self.stdout.write("Loading products from HS CSV...")
products = self._load_product_pool(product_csv)
self.stdout.write(f"Found {len(products)} products")
# Filter by product name if specified
if product_filter:
products = [p for p in products if product_filter.lower() in p[0].lower()]
if not products:
self.stdout.write(self.style.ERROR(f"No products matching '{product_filter}' found."))
return
self.stdout.write(f"Filtered to {len(products)} products matching '{product_filter}'")
# Limit to product_count distinct items (random sample if possible)
if len(products) > product_count:
products = random.sample(products, product_count)
self.stdout.write(f"Using {len(products)} products for seeding")
# Fetch African hubs from geo service
self.stdout.write("Fetching African hubs from geo service...")
hubs = self._fetch_african_hubs(geo_url)
if not hubs:
self.stdout.write(self.style.ERROR("No African hubs found from geo service. Aborting seed."))
return
self.stdout.write(f"Found {len(hubs)} African hubs")
# Create suppliers
self._company_pool = self._load_company_pool(company_csv)
self.stdout.write(f"Creating {suppliers_count} suppliers...")
new_suppliers = self._create_suppliers(suppliers_count, hubs)
self.stdout.write(self.style.SUCCESS(f"Created {len(new_suppliers)} suppliers"))
# Create offers
self.stdout.write(f"Creating {offers_count} offers (workflow={use_workflow})...")
if use_workflow and use_bulk:
self.stdout.write(self.style.ERROR("Bulk mode is only supported with --no-workflow."))
return
if use_workflow:
created_offers = self._create_offers_via_workflow(
offers_count, hubs, products, supplier_location_ratio, sleep_ms
)
elif use_bulk:
created_offers = self._create_offers_direct_bulk(
offers_count, hubs, products, supplier_location_ratio, bulk_size
)
else:
created_offers = self._create_offers_direct(
offers_count, hubs, products, supplier_location_ratio, sleep_ms
)
self.stdout.write(self.style.SUCCESS(f"Created {len(created_offers)} offers"))
def _find_default_product_csv(self) -> str | None:
"""Locate default HS product CSV in repo (datasets/hs/exchange_seed_product_hs_map.csv)."""
here = Path(__file__).resolve()
for parent in here.parents:
candidate = parent / DEFAULT_HS_PRODUCTS_PATH
if candidate.exists():
return str(candidate)
return None
def _load_product_pool(self, csv_path: str | None) -> list[tuple]:
"""Load real product names from HS CSV; returns list of tuples."""
path = csv_path or self._find_default_product_csv()
if not path or not os.path.exists(path):
raise CommandError(
"HS product CSV not found. Seed requires real product data; "
"ensure datasets/hs/exchange_seed_product_hs_map.csv is available."
)
products: list[tuple] = []
seen = set()
try:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
name = (row.get("product") or "").strip()
hs6 = (row.get("hs6") or "").strip()
label = (row.get("label") or "").strip()
if not name or not hs6:
continue
key = (name, hs6)
if key in seen:
continue
seen.add(key)
product_uuid = self._stable_uuid("hs6", hs6)
category = label or name
products.append((name, category, product_uuid, Decimal("1000.00")))
except Exception as e:
raise CommandError(f"Failed to read HS product CSV: {e}")
if not products:
raise CommandError("HS product CSV is empty. Seed requires real product data.")
random.shuffle(products)
return products
def _fetch_products_from_odoo(self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str) -> list:
"""Fetch products from Odoo via JSON-RPC"""
products = []
try:
# Search for products
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db, # database
odoo_user, # uid
odoo_password, # password
"products.product", # model
"search_read",
[[]], # domain (all products)
{"fields": ["uuid", "name", "category_id"]},
],
},
"id": 1,
},
timeout=10,
)
if response.status_code == 200:
data = response.json()
result = data.get("result", [])
for p in result:
category_name = p.get("category_id", [None, "Agriculture"])[1] if p.get("category_id") else "Agriculture"
price = self._price_for_product(p.get("name", ""))
products.append((p["name"], category_name, p.get("uuid") or str(uuid.uuid4()), price))
except Exception as e:
self.stdout.write(self.style.WARNING(f"Failed to fetch products from Odoo: {e}"))
return products
def _ensure_products_in_odoo(
self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str, existing: list
) -> list:
"""Ensure PRODUCT_CATALOG exists in Odoo, return unified list."""
existing_names = {p[0] for p in existing}
products = list(existing)
for item in PRODUCT_CATALOG:
if item["name"] in existing_names:
continue
try:
# Find or create category
category_id = self._get_or_create_category(
odoo_url, odoo_db, odoo_user, odoo_password, item["category"]
)
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db,
odoo_user,
odoo_password,
"products.product",
"create",
[
{
"name": item["name"],
"category_id": category_id,
"uuid": str(uuid.uuid4()),
}
],
],
},
"id": 1,
},
timeout=10,
)
if response.status_code == 200 and response.json().get("result"):
created_uuid = self._fetch_product_uuid(
odoo_url, odoo_db, odoo_user, odoo_password, item["name"]
)
products.append((
item["name"],
item["category"],
created_uuid or str(uuid.uuid4()),
item["price"],
))
except Exception as e:
self.stdout.write(self.style.WARNING(f"Failed to create product {item['name']}: {e}"))
return products
def _fetch_product_uuid(
self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str, name: str
) -> str | None:
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db,
odoo_user,
odoo_password,
"products.product",
"search_read",
[[("name", "=", name)]],
{"fields": ["uuid"], "limit": 1},
],
},
"id": 1,
},
timeout=10,
)
if response.status_code == 200:
result = response.json().get("result", [])
if result and result[0].get("uuid"):
return result[0]["uuid"]
return None
def _get_or_create_category(
self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str, name: str
) -> int:
"""Find or create a product category in Odoo."""
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db,
odoo_user,
odoo_password,
"product.category",
"search",
[[("name", "=", name)]],
{"limit": 1},
],
},
"id": 1,
},
timeout=10,
)
if response.status_code == 200 and response.json().get("result"):
return response.json()["result"][0]
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db,
odoo_user,
odoo_password,
"product.category",
"create",
[{"name": name}],
],
},
"id": 1,
},
timeout=10,
)
return response.json().get("result", 1)
def _fetch_african_hubs(self, geo_url: str) -> list:
"""Fetch African hubs from geo service via GraphQL.
Gets all nodes and filters by African countries in Python
since the GraphQL schema doesn't support country filter.
"""
african_countries = {
"Côte d'Ivoire", "Ivory Coast", "Ghana", "Nigeria",
"Cameroon", "Togo", "Senegal", "Mali", "Burkina Faso",
"Guinea", "Benin", "Niger", "Sierra Leone", "Liberia",
}
query = """
query GetNodes($limit: Int) {
nodes(limit: $limit) {
uuid
name
country
countryCode
latitude
longitude
}
}
"""
try:
response = requests.post(
geo_url,
json={"query": query, "variables": {"limit": 5000}},
timeout=30,
)
if response.status_code == 200:
data = response.json()
if "errors" in data:
self.stdout.write(self.style.WARNING(f"GraphQL errors: {data['errors']}"))
return []
nodes = data.get("data", {}).get("nodes", [])
# Filter by African countries
african_hubs = [
n for n in nodes
if n.get("country") in african_countries
]
return african_hubs
except Exception as e:
self.stdout.write(self.style.WARNING(f"Failed to fetch hubs: {e}"))
return []
def _default_african_hubs(self) -> list:
"""Default African hubs if geo service is unavailable"""
return [
{
"uuid": str(uuid.uuid4()),
"name": "Port of Abidjan",
"country": "Côte d'Ivoire",
"countryCode": "CI",
"latitude": 5.3167,
"longitude": -4.0167,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of San Pedro",
"country": "Côte d'Ivoire",
"countryCode": "CI",
"latitude": 4.7500,
"longitude": -6.6333,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Tema",
"country": "Ghana",
"countryCode": "GH",
"latitude": 5.6333,
"longitude": -0.0167,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Takoradi",
"country": "Ghana",
"countryCode": "GH",
"latitude": 4.8833,
"longitude": -1.7500,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Lagos",
"country": "Nigeria",
"countryCode": "NG",
"latitude": 6.4531,
"longitude": 3.3958,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Douala",
"country": "Cameroon",
"countryCode": "CM",
"latitude": 4.0483,
"longitude": 9.7043,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Lomé",
"country": "Togo",
"countryCode": "TG",
"latitude": 6.1375,
"longitude": 1.2125,
},
]
def _create_suppliers(self, count: int, hubs: list) -> list:
"""Create supplier profiles in African countries"""
created = []
for idx in range(count):
hub = random.choice(hubs) if hubs else None
country, country_code = self._get_random_african_country()
# Use hub coordinates if available, otherwise use country defaults
if hub:
lat = hub["latitude"] + random.uniform(-0.5, 0.5)
lng = hub["longitude"] + random.uniform(-0.5, 0.5)
else:
lat, lng = self._get_country_coords(country)
lat += random.uniform(-0.5, 0.5)
lng += random.uniform(-0.5, 0.5)
company = self._pick_company(idx)
if company:
name = company["name"]
company_code = company.get("country_code")
mapped_name = self._country_name_from_code(company_code)
if mapped_name:
country = mapped_name
country_code = company_code
supplier_uuid = self._stable_uuid("supplier", company.get("lei") or name)
team_uuid = self._stable_uuid("team", company.get("lei") or name)
else:
name = self._generate_supplier_name(idx)
supplier_uuid = str(uuid.uuid4())
team_uuid = str(uuid.uuid4())
description = (
f"{name} is a reliable supplier based in {country}, "
"focused on consistent quality and transparent logistics."
)
profile = SupplierProfile.objects.create(
uuid=supplier_uuid,
team_uuid=team_uuid,
name=name,
description=description,
country=country,
country_code=country_code,
logo_url="",
latitude=lat,
longitude=lng,
is_verified=random.choice([True, True, False]), # 66% verified
is_active=True,
)
created.append(profile)
return created
def _generate_supplier_name(self, index: int) -> str:
raise CommandError("Supplier name fallback is disabled. Provide real company CSV.")
def _find_default_company_csv(self) -> str | None:
"""Locate default company CSV in repo (datasets/gleif/africa_lei_companies.csv)."""
here = Path(__file__).resolve()
for parent in here.parents:
candidate = parent / DEFAULT_GLEIF_PATH
if candidate.exists():
return str(candidate)
return None
def _load_company_pool(self, csv_path: str | None) -> list[dict]:
"""Load real company names from CSV; returns list of dicts."""
path = csv_path or self._find_default_company_csv()
if not path or not os.path.exists(path):
raise CommandError(
"Company CSV not found. Seed requires real company names; "
"ensure datasets/gleif/africa_lei_companies.csv is available."
)
companies = []
seen = set()
try:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
name = (row.get("entity_name") or "").strip()
if not name:
continue
if name in seen:
continue
seen.add(name)
companies.append(
{
"name": name,
"lei": (row.get("lei") or "").strip(),
"country_code": (row.get("legal_address_country") or row.get("headquarters_country") or "").strip(),
"city": (row.get("legal_address_city") or row.get("headquarters_city") or "").strip(),
}
)
except Exception as e:
raise CommandError(f"Failed to read company CSV: {e}")
random.shuffle(companies)
self.stdout.write(f"Loaded {len(companies)} company names from CSV")
return companies
def _pick_company(self, index: int) -> dict | None:
if not getattr(self, "_company_pool", None):
raise CommandError("Company pool is empty. Seed requires real company CSV.")
if index < len(self._company_pool):
return self._company_pool[index]
return random.choice(self._company_pool)
def _stable_uuid(self, prefix: str, value: str) -> str:
return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{prefix}:{value}"))
def _country_name_from_code(self, code: str | None) -> str | None:
if not code:
return None
for name, country_code, _, _ in AFRICAN_COUNTRIES:
if country_code == code:
return name
return None
def _normalize_geo_url(self, url: str) -> str:
"""Ensure geo URL has scheme and GraphQL path."""
value = url.strip()
if not value.startswith(("http://", "https://")):
value = f"http://{value}"
if "/graphql" not in value:
value = value.rstrip("/") + "/graphql/public/"
return value
def _price_for_product(self, product_name: str) -> Decimal:
for item in PRODUCT_CATALOG:
if item["name"].lower() == product_name.lower():
return item["price"]
return Decimal("1000.00")
def _pick_location(self, supplier: SupplierProfile, hubs: list, supplier_ratio: float) -> dict:
"""Pick location: supplier address (ratio) or hub."""
use_supplier = random.random() < supplier_ratio
if use_supplier:
location_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"supplier:{supplier.uuid}"))
return {
"uuid": location_uuid,
"name": f"{supplier.name} Warehouse",
"country": supplier.country,
"countryCode": supplier.country_code,
"latitude": supplier.latitude,
"longitude": supplier.longitude,
}
return random.choice(hubs) if hubs else {
"uuid": str(uuid.uuid4()),
"name": "Regional Hub",
"country": supplier.country,
"countryCode": supplier.country_code,
"latitude": supplier.latitude,
"longitude": supplier.longitude,
}
def _create_offers_via_workflow(
self, count: int, hubs: list, products: list, supplier_ratio: float, sleep_ms: int
) -> list:
"""Create offers via Temporal workflow (syncs to graph)"""
created = []
suppliers = list(SupplierProfile.objects.all())
if not suppliers:
self.stdout.write(self.style.ERROR("No suppliers found. Create suppliers first."))
return created
for idx in range(count):
supplier = random.choice(suppliers)
hub = self._pick_location(supplier, hubs, supplier_ratio)
product_name, category_name, product_uuid, product_price = random.choice(products)
data = OfferData(
team_uuid=supplier.team_uuid,
product_uuid=product_uuid,
product_name=product_name,
category_name=category_name,
location_uuid=hub["uuid"],
location_name=hub["name"],
location_country=hub["country"],
location_country_code=hub.get("countryCode", ""),
location_latitude=hub["latitude"],
location_longitude=hub["longitude"],
quantity=self._rand_decimal(10, 500, 2),
unit="ton",
price_per_unit=product_price,
currency="USD",
description=f"{product_name} available from {hub['name']} in {hub['country']}",
)
try:
offer_uuid, workflow_id, _ = OfferService.create_offer_via_workflow(data)
self.stdout.write(f" [{idx+1}/{count}] Created offer {offer_uuid[:8]}... workflow: {workflow_id}")
created.append(offer_uuid)
except Exception as e:
self.stdout.write(self.style.ERROR(f" [{idx+1}/{count}] Failed: {e}"))
if sleep_ms:
time.sleep(sleep_ms / 1000.0)
return created
def _create_offers_direct(
self, count: int, hubs: list, products: list, supplier_ratio: float, sleep_ms: int
) -> list:
"""Create offers directly in DB (no workflow, no graph sync)"""
created = []
suppliers = list(SupplierProfile.objects.all())
if not suppliers:
self.stdout.write(self.style.ERROR("No suppliers found. Create suppliers first."))
return created
for idx in range(count):
supplier = random.choice(suppliers)
hub = self._pick_location(supplier, hubs, supplier_ratio)
product_name, category_name, product_uuid, product_price = random.choice(products)
offer = Offer.objects.create(
uuid=str(uuid.uuid4()),
team_uuid=supplier.team_uuid,
status="active",
workflow_status="pending",
location_uuid=hub["uuid"],
location_name=hub["name"],
location_country=hub["country"],
location_country_code=hub.get("countryCode", ""),
location_latitude=hub["latitude"],
location_longitude=hub["longitude"],
product_uuid=product_uuid,
product_name=product_name,
category_name=category_name,
quantity=self._rand_decimal(10, 500, 2),
unit="ton",
price_per_unit=product_price,
currency="USD",
description=f"{product_name} available from {hub['name']} in {hub['country']}",
)
created.append(offer)
if sleep_ms:
time.sleep(sleep_ms / 1000.0)
return created
def _create_offers_direct_bulk(
self, count: int, hubs: list, products: list, supplier_ratio: float, bulk_size: int
) -> list:
"""Create offers in bulk (no workflow, no graph sync)"""
suppliers = list(SupplierProfile.objects.all())
if not suppliers:
self.stdout.write(self.style.ERROR("No suppliers found. Create suppliers first."))
return []
created_uuids: list[str] = []
batch: list[Offer] = []
for idx in range(count):
supplier = random.choice(suppliers)
hub = self._pick_location(supplier, hubs, supplier_ratio)
product_name, category_name, product_uuid, product_price = random.choice(products)
offer_uuid = str(uuid.uuid4())
batch.append(
Offer(
uuid=offer_uuid,
team_uuid=supplier.team_uuid,
status="active",
workflow_status="pending",
location_uuid=hub["uuid"],
location_name=hub["name"],
location_country=hub["country"],
location_country_code=hub.get("countryCode", ""),
location_latitude=hub["latitude"],
location_longitude=hub["longitude"],
product_uuid=product_uuid,
product_name=product_name,
category_name=category_name,
quantity=self._rand_decimal(10, 500, 2),
unit="ton",
price_per_unit=product_price,
currency="USD",
description=f"{product_name} available from {hub['name']} in {hub['country']}",
)
)
created_uuids.append(offer_uuid)
if len(batch) >= bulk_size:
Offer.objects.bulk_create(batch, batch_size=bulk_size)
batch = []
if batch:
Offer.objects.bulk_create(batch, batch_size=bulk_size)
return created_uuids
def _get_random_african_country(self) -> tuple:
"""Get random African country name and code"""
country, code, _, _ = random.choice(AFRICAN_COUNTRIES)
return country, code
def _get_country_coords(self, country: str) -> tuple:
"""Get default coordinates for a country"""
for name, code, lat, lng in AFRICAN_COUNTRIES:
if name == country:
return lat, lng
return 6.0, 0.0 # Default: Gulf of Guinea
def _rand_decimal(self, low: int, high: int, places: int) -> Decimal:
value = random.uniform(low, high)
quantize_str = "1." + "0" * places
return Decimal(str(value)).quantize(Decimal(quantize_str))