Files
pickle_vision/jetson/main.py
Ruslan Bakiev 8add172f22 Rewrite calibration: green court surface segmentation instead of line angles
- Detect green court surface via HSV color thresholding
- Find court boundary as quadrilateral (contour approximation)
- Detect white line segments within the green area only
- Debug image: green overlay + court quad + white lines + camera XYZ
- Much more robust than angle-based line classification

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 14:58:58 +07:00

436 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Pickle Vision - Referee System main entry point for Jetson.
Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers.
"""
import sys
import os
import cv2
import time
import base64
import argparse
import threading
import numpy as np
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ultralytics import YOLO
from src.streaming.camera_reader import CameraReader
from src.streaming.ring_buffer import FrameRingBuffer
from src.physics.trajectory import TrajectoryModel
from src.physics.event_detector import EventDetector
from src.calibration.camera_calibrator import (
CameraCalibrator, get_half_court_3d_points,
COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH
)
from src.web.app import app, state
BALL_CLASS_ID = 32 # sports ball in COCO
# Global references set in main()
_cam_readers = {}
_args = None
def auto_calibrate():
"""Detect the green court surface, find its boundary quadrilateral,
map to known court dimensions, compute camera 3D position via solvePnP.
Debug images show:
- Green mask overlay (what the system sees as court)
- Detected quadrilateral (court boundary)
- White line segments found on court
- Computed camera position
"""
results = {}
for sensor_id, reader in _cam_readers.items():
frame = reader.grab()
if frame is None:
results[str(sensor_id)] = {'ok': False, 'error': 'No frame available'}
continue
h, w = frame.shape[:2]
side = 'left' if sensor_id == 0 else 'right'
debug_frame = frame.copy()
# Step 1: Detect green court surface
court = _detect_court_surface(frame)
# Draw green mask as semi-transparent overlay
if court['green_mask'] is not None:
green_overlay = np.zeros_like(debug_frame)
green_overlay[court['green_mask'] > 0] = (0, 80, 0)
debug_frame = cv2.addWeighted(debug_frame, 1.0, green_overlay, 0.3, 0)
if court['quad'] is None:
cv2.putText(debug_frame, f"FAILED: {court['error']}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
results[str(sensor_id)] = {
'ok': False,
'error': f"CAM {sensor_id}: {court['error']}",
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
}
continue
quad = court['quad']
# Draw detected court quadrilateral
pts = quad.astype(int)
for i in range(4):
p1 = tuple(pts[i])
p2 = tuple(pts[(i + 1) % 4])
cv2.line(debug_frame, p1, p2, (0, 255, 0), 3)
cv2.circle(debug_frame, p1, 8, (0, 0, 255), -1)
cv2.putText(debug_frame, f"C{i}", (p1[0] + 10, p1[1] - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
# Step 2: Detect white lines on court surface
white_lines = _detect_white_lines_on_court(frame, court['green_mask'])
for seg in white_lines:
x1, y1, x2, y2 = seg
cv2.line(debug_frame, (x1, y1), (x2, y2), (255, 255, 0), 1)
# Step 3: Map quad corners to 3D court coordinates
# Quad corners are ordered: TL, TR, BR, BL
# For camera at net looking at one half-court:
# TL = far-left, TR = far-right (baseline)
# BL = near-left, BR = near-right (near net/camera)
corners_3d = get_half_court_3d_points(side)
# Calibrate
cal = CameraCalibrator()
cal.calibrate(quad, corners_3d, w, h)
cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten()
cv2.putText(debug_frame,
f"CAM{sensor_id}: X={cam_pos[0]:.2f} Y={cam_pos[1]:.2f} Z={cam_pos[2]:.2f}m",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(debug_frame,
f"{side} half | {len(white_lines)} white line segments",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# Save
cal_path = os.path.join(_args.calibration_dir,
f'cam{sensor_id}_calibration.json')
os.makedirs(os.path.dirname(cal_path), exist_ok=True)
cal.save(cal_path)
state['calibrators'][sensor_id] = cal
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
# Court lines in 3D for visualization
matched_lines_3d = _get_half_court_lines_3d(side)
results[str(sensor_id)] = {
'ok': True,
'camera_position': cam_pos.tolist(),
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
'matched_lines_3d': matched_lines_3d,
}
print(f"[CAM {sensor_id}] Calibrated! Camera at "
f"({cam_pos[0]:.2f}, {cam_pos[1]:.2f}, {cam_pos[2]:.2f})")
return results
def _get_half_court_lines_3d(side):
"""Return 3D line segments for all lines of a half-court."""
if side == 'left':
bx, kx = 0, 4.57 # baseline, kitchen
else:
bx, kx = 13.4, 8.83
lines = [
{'name': 'baseline', 'from': [bx, 0, 0], 'to': [bx, 6.1, 0]},
{'name': 'kitchen', 'from': [kx, 0, 0], 'to': [kx, 6.1, 0]},
{'name': 'sideline_near', 'from': [bx, 0, 0], 'to': [kx, 0, 0]},
{'name': 'sideline_far', 'from': [bx, 6.1, 0], 'to': [kx, 6.1, 0]},
{'name': 'center_service', 'from': [bx, 3.05, 0], 'to': [kx, 3.05, 0]},
]
return lines
def _detect_court_surface(frame):
"""Detect the green court surface and find its boundary as a quadrilateral.
The court surface is distinctly green against pink/gray surroundings.
We segment by color, find the largest contour, and approximate with 4 corners.
Returns dict with:
quad: 4x2 numpy array of court boundary corners in image, or None
green_mask: binary mask of detected green surface
error: description if detection failed
"""
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# Green court surface — relatively broad range to handle lighting
lower_green = np.array([30, 30, 50])
upper_green = np.array([90, 255, 220])
green_mask = cv2.inRange(hsv, lower_green, upper_green)
# Clean up noise
kernel = np.ones((7, 7), np.uint8)
green_mask = cv2.morphologyEx(green_mask, cv2.MORPH_CLOSE, kernel)
green_mask = cv2.morphologyEx(green_mask, cv2.MORPH_OPEN, kernel)
# Find contours
contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return {'quad': None, 'green_mask': green_mask, 'error': 'No green surface detected'}
# Take largest contour (should be the court)
largest = max(contours, key=cv2.contourArea)
area = cv2.contourArea(largest)
frame_area = frame.shape[0] * frame.shape[1]
if area < frame_area * 0.05:
return {'quad': None, 'green_mask': green_mask,
'error': f'Green area too small ({area / frame_area * 100:.0f}% of frame)'}
# Approximate contour with polygon
epsilon = 0.02 * cv2.arcLength(largest, True)
approx = cv2.approxPolyDP(largest, epsilon, True)
# We want 4 corners — if we got more, use convex hull + min area rect
if len(approx) == 4:
quad = approx.reshape(4, 2).astype(np.float32)
else:
# Use minimum area rectangle as fallback
rect = cv2.minAreaRect(largest)
quad = cv2.boxPoints(rect).astype(np.float32)
# Order corners: top-left, top-right, bottom-right, bottom-left
# Sort by y first to get top/bottom pairs, then by x within each pair
sorted_by_y = quad[np.argsort(quad[:, 1])]
top_pair = sorted_by_y[:2]
bottom_pair = sorted_by_y[2:]
top_pair = top_pair[np.argsort(top_pair[:, 0])]
bottom_pair = bottom_pair[np.argsort(bottom_pair[:, 0])]
quad = np.array([top_pair[0], top_pair[1], bottom_pair[1], bottom_pair[0]], dtype=np.float32)
return {'quad': quad, 'green_mask': green_mask, 'error': None}
def _detect_white_lines_on_court(frame, green_mask):
"""Detect white lines within the green court area.
Returns list of line segments [x1, y1, x2, y2] that are on the court surface.
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# White lines are bright pixels on the green surface
_, white = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY)
# Only keep white pixels within/near the court
dilated_court = cv2.dilate(green_mask, np.ones((15, 15), np.uint8))
white_on_court = cv2.bitwise_and(white, dilated_court)
# Hough line detection
lines = cv2.HoughLinesP(white_on_court, 1, np.pi / 180, threshold=40,
minLineLength=50, maxLineGap=25)
if lines is None:
return []
return [line[0].tolist() for line in lines]
def _capture_var_snapshot(frame, event):
"""Create a snapshot for VAR event and store it in state."""
# Draw VAR overlay on frame
snapshot = frame.copy()
cv2.putText(snapshot, "VAR DETECT", (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm",
(10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85])
b64 = base64.b64encode(jpeg.tobytes()).decode('ascii')
state['last_var'] = {
'event': event,
'snapshot_b64': b64,
}
def detection_loop(cam_readers, model, conf_threshold, ring_buffer):
"""Main detection loop: alternate cameras, run YOLO, update state."""
frame_counts = {sid: 0 for sid in cam_readers}
start_times = {sid: time.time() for sid in cam_readers}
trajectory = state['trajectory']
event_detector = state['event_detector']
calibrators = state['calibrators']
while True:
for sensor_id, reader in cam_readers.items():
cam = state['cameras'][sensor_id]
frame = reader.grab()
if frame is None:
continue
now = time.time()
# Store raw frame in ring buffer for VAR
ring_buffer.push(frame, now, sensor_id)
# Only run detection if camera is calibrated
cal = calibrators.get(sensor_id)
is_calibrated = cal and cal.calibrated
det_count = 0
best_detection = None
best_conf = 0
if is_calibrated:
# YOLO detection — only after calibration
results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold)
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
label = f"Ball {conf:.0%}"
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1)
cv2.putText(frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
det_count += 1
if conf > best_conf:
best_conf = conf
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
diameter = max(x2 - x1, y2 - y1)
best_detection = (cx, cy, diameter, conf)
# Update trajectory and check VAR
if best_detection:
px, py, diam, conf = best_detection
pos_3d = cal.pixel_to_3d(px, py, diam)
if pos_3d:
trajectory.add_observation(
pos_3d[0], pos_3d[1], pos_3d[2],
now, frame_counts[sensor_id], sensor_id, conf
)
# Check for close calls
event = event_detector.check(trajectory)
if event:
print(f"[VAR] Close call! Line: {event['line']}, "
f"Distance: {event['distance_m']*100:.0f}cm")
state['events'].append(event)
_capture_var_snapshot(frame, event)
# FPS tracking
frame_counts[sensor_id] += 1
elapsed = time.time() - start_times[sensor_id]
fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0
# HUD overlay
cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
if det_count > 0:
cv2.putText(frame, f"Balls: {det_count}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# Encode JPEG and update shared state
_, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
with cam['lock']:
cam['frame'] = jpeg.tobytes()
cam['fps'] = fps_actual
cam['detections'] = det_count
if frame_counts[sensor_id] % 150 == 0:
speed = trajectory.get_speed()
speed_str = f"{speed:.1f} m/s" if speed else "N/A"
print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, "
f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}")
def main():
global _cam_readers, _args
parser = argparse.ArgumentParser(description='Pickle Vision Referee System')
parser.add_argument('--width', type=int, default=1280)
parser.add_argument('--height', type=int, default=720)
parser.add_argument('--fps', type=int, default=30)
parser.add_argument('--model', type=str, default='yolov8n.pt')
parser.add_argument('--conf', type=float, default=0.25)
parser.add_argument('--port', type=int, default=8080)
parser.add_argument('--buffer-seconds', type=int, default=10,
help='Ring buffer size in seconds for VAR clips')
parser.add_argument('--calibration-dir', type=str,
default=os.path.join(os.path.dirname(__file__), 'config'),
help='Directory with calibration JSON files')
args = parser.parse_args()
_args = args
# Load YOLO model
print(f"Loading YOLO model: {args.model}")
model = YOLO(args.model)
try:
model.to("cuda")
print("Inference on CUDA")
except Exception:
print("CUDA unavailable, using CPU")
# Initialize shared state
state['trajectory'] = TrajectoryModel(fps=args.fps)
state['event_detector'] = EventDetector(trigger_distance_m=0.3)
state['calibration_dir'] = args.calibration_dir
state['calibrate_fn'] = auto_calibrate
ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps)
# Start with empty calibrators — user must calibrate via UI
os.makedirs(args.calibration_dir, exist_ok=True)
for sensor_id in [0, 1]:
state['calibrators'][sensor_id] = CameraCalibrator()
# Start camera readers
cam_readers = {}
for sensor_id in [0, 1]:
state['cameras'][sensor_id] = {
'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0
}
cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps)
_cam_readers = cam_readers
# Wait for at least one camera
print("Waiting for cameras...")
for _ in range(100):
if any(r.grab() is not None for r in cam_readers.values()):
break
time.sleep(0.1)
# Start detection loop
det_thread = threading.Thread(
target=detection_loop,
args=(cam_readers, model, args.conf, ring_buffer),
daemon=True
)
det_thread.start()
time.sleep(2)
print(f"\n{'=' * 50}")
print(f" Pickle Vision Referee System")
print(f" Open in browser: http://192.168.1.253:{args.port}")
print(f"{'=' * 50}\n")
app.run(host='0.0.0.0', port=args.port, threaded=True)
if __name__ == '__main__':
main()