Files
exchange/offers/management/commands/seed_exchange.py
Ruslan Bakiev c654d25230
All checks were successful
Build Docker Image / build (push) Successful in 2m4s
Add bulk seeding option
2026-02-05 01:34:39 +07:00

769 lines
30 KiB
Python

"""
Seed Suppliers and Offers for African cocoa belt.
Creates offers via Temporal workflow so they sync to the graph.
"""
import random
import uuid
from decimal import Decimal
import requests
from django.core.management.base import BaseCommand
from django.db import transaction
from offers.models import Offer
from offers.services import OfferService, OfferData
from suppliers.models import SupplierProfile
# African cocoa belt countries
AFRICAN_COUNTRIES = [
("Côte d'Ivoire", "CI", 6.8276, -5.2893), # Abidjan
("Ghana", "GH", 5.6037, -0.1870), # Accra
("Nigeria", "NG", 6.5244, 3.3792), # Lagos
("Cameroon", "CM", 4.0511, 9.7679), # Douala
("Togo", "TG", 6.1725, 1.2314), # Lomé
]
# Realistic supplier names (English, Africa-focused)
SUPPLIER_NAMES = [
"Cocoa Coast Exports", "Golden Savannah Trading", "Abidjan Agro Partners",
"Volta River Commodities", "Lagos Harbor Supply", "Accra Prime Exports",
"Tema Logistics & Trading", "Sahel Harvest Group", "Nile Delta Commodities",
"Gulf of Guinea Traders", "Kumasi Cocoa Collective", "Benin AgroLink",
"Douala Growth Partners", "Westbridge Commodities", "Ivory Gate Exporters",
"Ghana Frontier Trading", "Sunrise Agro Holdings", "Coastal Belt Supply",
"Keta Shore Commodities", "Takoradi Export House", "Mango Bay Trading",
"Savanna Crest Exports", "Sankofa Trade Corp", "Niger Delta Agrimark",
"Lake Volta Produce", "Zou River Exports", "Lomé Port Traders",
"Atlantic Harvest Co", "Forest Belt Commodities", "Côte d'Ivoire Supply",
"Ashanti Agro Trade", "Midland Cocoa Group", "Sahelian Produce Traders",
"Kintampo Agro Partners", "Gold Coast Exporters", "Cashew Ridge Trading",
"Prairie Coast Supply", "Harborline Exports", "Palm Coast Commodities",
"Green Belt Trading", "Westland Agro Link", "Delta Coast Produce",
"Kongo River Exports", "Bight of Benin Supply", "Akwa Ibom Traders",
"Cameroon Highlands Trading", "Coastal Plains Export", "Guinea Gulf Trading",
"Korhogo Agro Supply", "Northern Plains Traders", "Oti River Exports",
"Eastern Coast Commodities", "Sunset Bay Exporters", "Freetown Agro Trade",
"Makola Market Supply", "Afram Plains Trading", "Cedar Coast Commodities",
"Monrovia Export House", "Bissau Agro Partners", "Lac Togo Traders",
"Riverine Agro Link", "Cape Coast Exporters", "Delta Rise Commodities",
"Mali Savanna Trade", "Burkina Harvest Co", "Niger Basin Exports",
"Sierra Green Trading", "Liberia Agro Collective", "Congo Gate Traders",
"Ashanti Heritage Exports", "Ivory Belt Trading", "Sahel Horizon Supply",
"Atlantic Crest Commodities", "Green Valley Export", "Cocoa Ridge Trade",
"Palm Grove Exports", "Keta Delta Trading", "Lagoon Coast Commodities",
"Accra Trade Works", "Tema Export Alliance", "Lagos Trade Link",
"Cape Three Points Exports", "Ivory Coast Agro Hub", "Savanna Trade Network",
"Nile Coast Commodities", "Sahara Edge Trading", "Goldleaf Exports",
"Makeni Agro Partners", "Bamako Produce Traders", "Ouagadougou Exports",
"Conakry Trade House", "Port Harcourt Supply", "Calabar Exporters",
"Abuja Agro Traders", "Eko Commodities", "Gabon Forest Trade",
"Libreville Export Group", "Senegal River Commodities", "Dakar Trade Alliance",
"Kaolack Agro Supply", "Saint-Louis Exporters", "Zanzibar Coast Trading",
"Kilwa Harvest Group", "Lake Victoria Exports", "Mombasa Trade Gate",
"Dar Coast Commodities", "Maputo Export House",
]
# Fixed product catalog (10 items) with realistic prices per ton (USD)
PRODUCT_CATALOG = [
{"name": "Cocoa Beans", "category": "Cocoa", "price": Decimal("2450.00")},
{"name": "Shea Butter", "category": "Oils & Fats", "price": Decimal("1800.00")},
{"name": "Cashew Nuts", "category": "Nuts", "price": Decimal("5200.00")},
{"name": "Palm Oil", "category": "Oils & Fats", "price": Decimal("980.00")},
{"name": "Coffee Beans", "category": "Coffee", "price": Decimal("3800.00")},
{"name": "Sesame Seeds", "category": "Seeds", "price": Decimal("2100.00")},
{"name": "Cotton", "category": "Fiber", "price": Decimal("1650.00")},
{"name": "Maize", "category": "Grains", "price": Decimal("260.00")},
{"name": "Sorghum", "category": "Grains", "price": Decimal("230.00")},
{"name": "Natural Rubber", "category": "Industrial", "price": Decimal("1750.00")},
]
class Command(BaseCommand):
help = "Seed Suppliers and Offers for African cocoa belt with workflow sync"
def add_arguments(self, parser):
parser.add_argument(
"--suppliers",
type=int,
default=10,
help="How many suppliers to create (default: 10)",
)
parser.add_argument(
"--offers",
type=int,
default=50,
help="How many offers to create (default: 50)",
)
parser.add_argument(
"--product-count",
type=int,
default=10,
help="How many distinct products to use (default: 10)",
)
parser.add_argument(
"--supplier-location-ratio",
type=float,
default=0.8,
help="Share of offers that use supplier address (default: 0.8)",
)
parser.add_argument(
"--clear",
action="store_true",
help="Delete all existing suppliers and offers before seeding",
)
parser.add_argument(
"--no-workflow",
action="store_true",
help="Create offers directly in DB without workflow (no graph sync)",
)
parser.add_argument(
"--bulk",
action="store_true",
help="Use bulk_create for offers (only with --no-workflow)",
)
parser.add_argument(
"--bulk-size",
type=int,
default=200,
help="Batch size for bulk_create (default: 200)",
)
parser.add_argument(
"--geo-url",
type=str,
default="http://geo:8000/graphql/public/",
help="Geo service GraphQL URL (default: http://geo:8000/graphql/public/)",
)
parser.add_argument(
"--odoo-url",
type=str,
default="http://odoo:8069",
help="Odoo URL (default: http://odoo:8069)",
)
parser.add_argument(
"--ensure-products",
action="store_true",
help="Ensure product catalog exists in Odoo (create if missing)",
)
parser.add_argument(
"--odoo-db",
type=str,
default="odoo",
help="Odoo database name (default: odoo)",
)
parser.add_argument(
"--odoo-user",
type=int,
default=2,
help="Odoo user id (default: 2)",
)
parser.add_argument(
"--odoo-password",
type=str,
default="admin",
help="Odoo password (default: admin)",
)
parser.add_argument(
"--product",
type=str,
default=None,
help="Filter offers by product name (e.g., 'Cocoa Beans')",
)
def handle(self, *args, **options):
if options["clear"]:
with transaction.atomic():
offers_deleted, _ = Offer.objects.all().delete()
suppliers_deleted, _ = SupplierProfile.objects.all().delete()
self.stdout.write(self.style.WARNING(
f"Deleted {suppliers_deleted} supplier profiles and {offers_deleted} offers"
))
suppliers_count = max(0, options["suppliers"])
offers_count = max(0, options["offers"])
product_count = max(1, options["product_count"])
supplier_location_ratio = min(max(options["supplier_location_ratio"], 0.0), 1.0)
use_workflow = not options["no_workflow"]
use_bulk = options["bulk"]
bulk_size = max(1, options["bulk_size"])
geo_url = options["geo_url"]
odoo_url = options["odoo_url"]
product_filter = options["product"]
ensure_products = options["ensure_products"]
odoo_db = options["odoo_db"]
odoo_user = options["odoo_user"]
odoo_password = options["odoo_password"]
# Fetch products from Odoo
self.stdout.write("Fetching products from Odoo...")
products = self._fetch_products_from_odoo(odoo_url, odoo_db, odoo_user, odoo_password)
if ensure_products:
self.stdout.write("Ensuring product catalog exists in Odoo...")
products = self._ensure_products_in_odoo(
odoo_url, odoo_db, odoo_user, odoo_password, products
)
if not products:
self.stdout.write(self.style.WARNING("No products found in Odoo. Falling back to catalog only."))
products = self._catalog_products()
self.stdout.write(f"Found {len(products)} products")
# Filter by product name if specified
if product_filter:
products = [p for p in products if product_filter.lower() in p[0].lower()]
if not products:
self.stdout.write(self.style.ERROR(f"No products matching '{product_filter}' found."))
return
self.stdout.write(f"Filtered to {len(products)} products matching '{product_filter}'")
# Limit to product_count distinct items (random sample if possible)
if len(products) > product_count:
products = random.sample(products, product_count)
self.stdout.write(f"Using {len(products)} products for seeding")
# Fetch African hubs from geo service
self.stdout.write("Fetching African hubs from geo service...")
hubs = self._fetch_african_hubs(geo_url)
if not hubs:
self.stdout.write(self.style.WARNING(
"No African hubs found. Using default locations."
))
hubs = self._default_african_hubs()
self.stdout.write(f"Found {len(hubs)} African hubs")
# Create suppliers
self.stdout.write(f"Creating {suppliers_count} suppliers...")
new_suppliers = self._create_suppliers(suppliers_count, hubs)
self.stdout.write(self.style.SUCCESS(f"Created {len(new_suppliers)} suppliers"))
# Create offers
self.stdout.write(f"Creating {offers_count} offers (workflow={use_workflow})...")
if use_workflow and use_bulk:
self.stdout.write(self.style.ERROR("Bulk mode is only supported with --no-workflow."))
return
if use_workflow:
created_offers = self._create_offers_via_workflow(
offers_count, hubs, products, supplier_location_ratio
)
elif use_bulk:
created_offers = self._create_offers_direct_bulk(
offers_count, hubs, products, supplier_location_ratio, bulk_size
)
else:
created_offers = self._create_offers_direct(
offers_count, hubs, products, supplier_location_ratio
)
self.stdout.write(self.style.SUCCESS(f"Created {len(created_offers)} offers"))
def _catalog_products(self) -> list:
return [(p["name"], p["category"], str(uuid.uuid4()), p["price"]) for p in PRODUCT_CATALOG]
def _fetch_products_from_odoo(self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str) -> list:
"""Fetch products from Odoo via JSON-RPC"""
products = []
try:
# Search for products
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db, # database
odoo_user, # uid
odoo_password, # password
"products.product", # model
"search_read",
[[]], # domain (all products)
{"fields": ["uuid", "name", "category_id"]},
],
},
"id": 1,
},
timeout=10,
)
if response.status_code == 200:
data = response.json()
result = data.get("result", [])
for p in result:
category_name = p.get("category_id", [None, "Agriculture"])[1] if p.get("category_id") else "Agriculture"
price = self._price_for_product(p.get("name", ""))
products.append((p["name"], category_name, p.get("uuid") or str(uuid.uuid4()), price))
except Exception as e:
self.stdout.write(self.style.WARNING(f"Failed to fetch products from Odoo: {e}"))
return products
def _ensure_products_in_odoo(
self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str, existing: list
) -> list:
"""Ensure PRODUCT_CATALOG exists in Odoo, return unified list."""
existing_names = {p[0] for p in existing}
products = list(existing)
for item in PRODUCT_CATALOG:
if item["name"] in existing_names:
continue
try:
# Find or create category
category_id = self._get_or_create_category(
odoo_url, odoo_db, odoo_user, odoo_password, item["category"]
)
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db,
odoo_user,
odoo_password,
"products.product",
"create",
[
{
"name": item["name"],
"category_id": category_id,
"uuid": str(uuid.uuid4()),
}
],
],
},
"id": 1,
},
timeout=10,
)
if response.status_code == 200 and response.json().get("result"):
created_uuid = self._fetch_product_uuid(
odoo_url, odoo_db, odoo_user, odoo_password, item["name"]
)
products.append((
item["name"],
item["category"],
created_uuid or str(uuid.uuid4()),
item["price"],
))
except Exception as e:
self.stdout.write(self.style.WARNING(f"Failed to create product {item['name']}: {e}"))
return products
def _fetch_product_uuid(
self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str, name: str
) -> str | None:
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db,
odoo_user,
odoo_password,
"products.product",
"search_read",
[[("name", "=", name)]],
{"fields": ["uuid"], "limit": 1},
],
},
"id": 1,
},
timeout=10,
)
if response.status_code == 200:
result = response.json().get("result", [])
if result and result[0].get("uuid"):
return result[0]["uuid"]
return None
def _get_or_create_category(
self, odoo_url: str, odoo_db: str, odoo_user: int, odoo_password: str, name: str
) -> int:
"""Find or create a product category in Odoo."""
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db,
odoo_user,
odoo_password,
"product.category",
"search",
[[("name", "=", name)]],
{"limit": 1},
],
},
"id": 1,
},
timeout=10,
)
if response.status_code == 200 and response.json().get("result"):
return response.json()["result"][0]
response = requests.post(
f"{odoo_url}/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "call",
"params": {
"service": "object",
"method": "execute_kw",
"args": [
odoo_db,
odoo_user,
odoo_password,
"product.category",
"create",
[{"name": name}],
],
},
"id": 1,
},
timeout=10,
)
return response.json().get("result", 1)
def _fetch_african_hubs(self, geo_url: str) -> list:
"""Fetch African hubs from geo service via GraphQL.
Gets all nodes and filters by African countries in Python
since the GraphQL schema doesn't support country filter.
"""
african_countries = {
"Côte d'Ivoire", "Ivory Coast", "Ghana", "Nigeria",
"Cameroon", "Togo", "Senegal", "Mali", "Burkina Faso",
"Guinea", "Benin", "Niger", "Sierra Leone", "Liberia",
}
query = """
query GetNodes($limit: Int) {
nodes(limit: $limit) {
uuid
name
country
countryCode
latitude
longitude
}
}
"""
try:
response = requests.post(
geo_url,
json={"query": query, "variables": {"limit": 5000}},
timeout=30,
)
if response.status_code == 200:
data = response.json()
if "errors" in data:
self.stdout.write(self.style.WARNING(f"GraphQL errors: {data['errors']}"))
return []
nodes = data.get("data", {}).get("nodes", [])
# Filter by African countries
african_hubs = [
n for n in nodes
if n.get("country") in african_countries
]
return african_hubs
except Exception as e:
self.stdout.write(self.style.WARNING(f"Failed to fetch hubs: {e}"))
return []
def _default_african_hubs(self) -> list:
"""Default African hubs if geo service is unavailable"""
return [
{
"uuid": str(uuid.uuid4()),
"name": "Port of Abidjan",
"country": "Côte d'Ivoire",
"countryCode": "CI",
"latitude": 5.3167,
"longitude": -4.0167,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of San Pedro",
"country": "Côte d'Ivoire",
"countryCode": "CI",
"latitude": 4.7500,
"longitude": -6.6333,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Tema",
"country": "Ghana",
"countryCode": "GH",
"latitude": 5.6333,
"longitude": -0.0167,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Takoradi",
"country": "Ghana",
"countryCode": "GH",
"latitude": 4.8833,
"longitude": -1.7500,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Lagos",
"country": "Nigeria",
"countryCode": "NG",
"latitude": 6.4531,
"longitude": 3.3958,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Douala",
"country": "Cameroon",
"countryCode": "CM",
"latitude": 4.0483,
"longitude": 9.7043,
},
{
"uuid": str(uuid.uuid4()),
"name": "Port of Lomé",
"country": "Togo",
"countryCode": "TG",
"latitude": 6.1375,
"longitude": 1.2125,
},
]
def _create_suppliers(self, count: int, hubs: list) -> list:
"""Create supplier profiles in African countries"""
created = []
for idx in range(count):
hub = random.choice(hubs) if hubs else None
country, country_code = self._get_random_african_country()
# Use hub coordinates if available, otherwise use country defaults
if hub:
lat = hub["latitude"] + random.uniform(-0.5, 0.5)
lng = hub["longitude"] + random.uniform(-0.5, 0.5)
else:
lat, lng = self._get_country_coords(country)
lat += random.uniform(-0.5, 0.5)
lng += random.uniform(-0.5, 0.5)
name = self._generate_supplier_name(idx)
description = (
f"{name} is a reliable supplier based in {country}, "
"focused on consistent quality and transparent logistics."
)
profile = SupplierProfile.objects.create(
uuid=str(uuid.uuid4()),
team_uuid=str(uuid.uuid4()),
name=name,
description=description,
country=country,
country_code=country_code,
logo_url="",
latitude=lat,
longitude=lng,
is_verified=random.choice([True, True, False]), # 66% verified
is_active=True,
)
created.append(profile)
return created
def _generate_supplier_name(self, index: int) -> str:
"""Pick a realistic supplier name; fall back if list is exhausted."""
if index < len(SUPPLIER_NAMES):
return SUPPLIER_NAMES[index]
return f"{random.choice(SUPPLIER_NAMES)} Group"
def _price_for_product(self, product_name: str) -> Decimal:
for item in PRODUCT_CATALOG:
if item["name"].lower() == product_name.lower():
return item["price"]
return Decimal("1000.00")
def _pick_location(self, supplier: SupplierProfile, hubs: list, supplier_ratio: float) -> dict:
"""Pick location: supplier address (ratio) or hub."""
use_supplier = random.random() < supplier_ratio
if use_supplier:
location_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"supplier:{supplier.uuid}"))
return {
"uuid": location_uuid,
"name": f"{supplier.name} Warehouse",
"country": supplier.country,
"countryCode": supplier.country_code,
"latitude": supplier.latitude,
"longitude": supplier.longitude,
}
return random.choice(hubs) if hubs else {
"uuid": str(uuid.uuid4()),
"name": "Regional Hub",
"country": supplier.country,
"countryCode": supplier.country_code,
"latitude": supplier.latitude,
"longitude": supplier.longitude,
}
def _create_offers_via_workflow(self, count: int, hubs: list, products: list, supplier_ratio: float) -> list:
"""Create offers via Temporal workflow (syncs to graph)"""
created = []
suppliers = list(SupplierProfile.objects.all())
if not suppliers:
self.stdout.write(self.style.ERROR("No suppliers found. Create suppliers first."))
return created
for idx in range(count):
supplier = random.choice(suppliers)
hub = self._pick_location(supplier, hubs, supplier_ratio)
product_name, category_name, product_uuid, product_price = random.choice(products)
data = OfferData(
team_uuid=supplier.team_uuid,
product_uuid=product_uuid,
product_name=product_name,
category_name=category_name,
location_uuid=hub["uuid"],
location_name=hub["name"],
location_country=hub["country"],
location_country_code=hub.get("countryCode", ""),
location_latitude=hub["latitude"],
location_longitude=hub["longitude"],
quantity=self._rand_decimal(10, 500, 2),
unit="ton",
price_per_unit=product_price,
currency="USD",
description=f"{product_name} available from {hub['name']} in {hub['country']}",
)
try:
offer_uuid, workflow_id, _ = OfferService.create_offer_via_workflow(data)
self.stdout.write(f" [{idx+1}/{count}] Created offer {offer_uuid[:8]}... workflow: {workflow_id}")
created.append(offer_uuid)
except Exception as e:
self.stdout.write(self.style.ERROR(f" [{idx+1}/{count}] Failed: {e}"))
return created
def _create_offers_direct(self, count: int, hubs: list, products: list, supplier_ratio: float) -> list:
"""Create offers directly in DB (no workflow, no graph sync)"""
created = []
suppliers = list(SupplierProfile.objects.all())
if not suppliers:
self.stdout.write(self.style.ERROR("No suppliers found. Create suppliers first."))
return created
for idx in range(count):
supplier = random.choice(suppliers)
hub = self._pick_location(supplier, hubs, supplier_ratio)
product_name, category_name, product_uuid, product_price = random.choice(products)
offer = Offer.objects.create(
uuid=str(uuid.uuid4()),
team_uuid=supplier.team_uuid,
status="active",
workflow_status="pending",
location_uuid=hub["uuid"],
location_name=hub["name"],
location_country=hub["country"],
location_country_code=hub.get("countryCode", ""),
location_latitude=hub["latitude"],
location_longitude=hub["longitude"],
product_uuid=product_uuid,
product_name=product_name,
category_name=category_name,
quantity=self._rand_decimal(10, 500, 2),
unit="ton",
price_per_unit=product_price,
currency="USD",
description=f"{product_name} available from {hub['name']} in {hub['country']}",
)
created.append(offer)
return created
def _create_offers_direct_bulk(
self, count: int, hubs: list, products: list, supplier_ratio: float, bulk_size: int
) -> list:
"""Create offers in bulk (no workflow, no graph sync)"""
suppliers = list(SupplierProfile.objects.all())
if not suppliers:
self.stdout.write(self.style.ERROR("No suppliers found. Create suppliers first."))
return []
created_uuids: list[str] = []
batch: list[Offer] = []
for idx in range(count):
supplier = random.choice(suppliers)
hub = self._pick_location(supplier, hubs, supplier_ratio)
product_name, category_name, product_uuid, product_price = random.choice(products)
offer_uuid = str(uuid.uuid4())
batch.append(
Offer(
uuid=offer_uuid,
team_uuid=supplier.team_uuid,
status="active",
workflow_status="pending",
location_uuid=hub["uuid"],
location_name=hub["name"],
location_country=hub["country"],
location_country_code=hub.get("countryCode", ""),
location_latitude=hub["latitude"],
location_longitude=hub["longitude"],
product_uuid=product_uuid,
product_name=product_name,
category_name=category_name,
quantity=self._rand_decimal(10, 500, 2),
unit="ton",
price_per_unit=product_price,
currency="USD",
description=f"{product_name} available from {hub['name']} in {hub['country']}",
)
)
created_uuids.append(offer_uuid)
if len(batch) >= bulk_size:
Offer.objects.bulk_create(batch, batch_size=bulk_size)
batch = []
if batch:
Offer.objects.bulk_create(batch, batch_size=bulk_size)
return created_uuids
def _get_random_african_country(self) -> tuple:
"""Get random African country name and code"""
country, code, _, _ = random.choice(AFRICAN_COUNTRIES)
return country, code
def _get_country_coords(self, country: str) -> tuple:
"""Get default coordinates for a country"""
for name, code, lat, lng in AFRICAN_COUNTRIES:
if name == country:
return lat, lng
return 6.0, 0.0 # Default: Gulf of Guinea
def _rand_decimal(self, low: int, high: int, places: int) -> Decimal:
value = random.uniform(low, high)
quantize_str = "1." + "0" * places
return Decimal(str(value)).quantize(Decimal(quantize_str))