Files
pickle_vision/jetson/main.py
Ruslan Bakiev 4c9b48e057 Add referee system architecture: 3-tab web UI, physics, VAR detection
- src/calibration: 3D camera calibration (solvePnP, homography)
- src/physics: trajectory model with gravity/drag, close call event detector
- src/streaming: camera reader + ring buffer for VAR clips
- src/web: Flask app with 3-tab UI (Detection, Court, Trajectory) + Three.js
- jetson/main.py: unified entry point wiring all components

Court tab: one-click calibration button, 3D scene with camera positions
Trajectory tab: accumulated ball path, VAR panel with snapshot + timer

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 13:16:12 +07:00

390 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Pickle Vision - Referee System main entry point for Jetson.
Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers.
"""
import sys
import os
import cv2
import time
import base64
import argparse
import threading
import numpy as np
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ultralytics import YOLO
from src.streaming.camera_reader import CameraReader
from src.streaming.ring_buffer import FrameRingBuffer
from src.physics.trajectory import TrajectoryModel
from src.physics.event_detector import EventDetector
from src.calibration.camera_calibrator import (
CameraCalibrator, get_half_court_3d_points,
COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH
)
from src.web.app import app, state
BALL_CLASS_ID = 32 # sports ball in COCO
# Global references set in main()
_cam_readers = {}
_args = None
def auto_calibrate():
"""One-click calibration: detect court lines from current frames,
compute camera pose, save to config.
Each camera sees one half of the court from the net position.
We use the court lines visible in each frame to build correspondences.
For now: use a simple approach — detect the 4 most prominent lines
(baseline, two sidelines, kitchen line) and map to known 3D coords.
Falls back to estimated corners based on frame geometry if detection fails.
"""
results = {}
for sensor_id, reader in _cam_readers.items():
frame = reader.grab()
if frame is None:
results[str(sensor_id)] = {'ok': False, 'error': 'No frame available'}
continue
h, w = frame.shape[:2]
side = 'left' if sensor_id == 0 else 'right'
# Try to detect court lines using edge detection + Hough
corners_pixel = _detect_court_corners(frame, side)
if corners_pixel is None:
# Fallback: estimate corners from typical camera-at-net perspective
corners_pixel = _estimate_corners_from_net(w, h, side)
# Get known 3D coordinates for this half
corners_3d = get_half_court_3d_points(side)
# Calibrate
cal = CameraCalibrator()
ok = cal.calibrate(
np.array(corners_pixel, dtype=np.float32),
corners_3d,
w, h
)
if ok:
# Save to config
cal_path = os.path.join(_args.calibration_dir,
f'cam{sensor_id}_calibration.json')
os.makedirs(os.path.dirname(cal_path), exist_ok=True)
cal.save(cal_path)
state['calibrators'][sensor_id] = cal
# Get camera position for 3D scene
cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten()
results[str(sensor_id)] = {
'ok': True,
'camera_position': cam_pos.tolist(),
'corners_pixel': corners_pixel.tolist() if isinstance(corners_pixel, np.ndarray)
else corners_pixel,
}
print(f"[CAM {sensor_id}] Calibrated! Camera at "
f"({cam_pos[0]:.1f}, {cam_pos[1]:.1f}, {cam_pos[2]:.1f})")
else:
results[str(sensor_id)] = {'ok': False, 'error': 'Calibration failed'}
return results
def _detect_court_corners(frame, side):
"""Detect court corners from frame using edge detection.
Returns 4 corner points as numpy array, or None if detection fails.
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blur, 50, 150)
# Detect lines
lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=80,
minLineLength=100, maxLineGap=20)
if lines is None or len(lines) < 4:
return None
# Classify lines into horizontal and vertical
horizontals = []
verticals = []
for line in lines:
x1, y1, x2, y2 = line[0]
angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi)
if angle < 30 or angle > 150:
horizontals.append(line[0])
elif 60 < angle < 120:
verticals.append(line[0])
if len(horizontals) < 2 or len(verticals) < 2:
return None
# Cluster lines by position to find the dominant ones
h_positions = sorted(horizontals, key=lambda l: (l[1] + l[3]) / 2)
v_positions = sorted(verticals, key=lambda l: (l[0] + l[2]) / 2)
# Take the most separated horizontal pair (top and bottom court lines)
top_line = h_positions[0]
bottom_line = h_positions[-1]
# Take the most separated vertical pair (left and right sidelines)
left_line = v_positions[0]
right_line = v_positions[-1]
# Find intersections as corner points
def line_intersection(l1, l2):
x1, y1, x2, y2 = l1
x3, y3, x4, y4 = l2
denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
if abs(denom) < 1e-6:
return None
t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom
ix = x1 + t * (x2 - x1)
iy = y1 + t * (y2 - y1)
return [ix, iy]
corners = [
line_intersection(top_line, left_line), # TL
line_intersection(top_line, right_line), # TR
line_intersection(bottom_line, right_line), # BR
line_intersection(bottom_line, left_line), # BL
]
if any(c is None for c in corners):
return None
return np.array(corners, dtype=np.float32)
def _estimate_corners_from_net(w, h, side):
"""Estimate court corner positions for a camera mounted at the net.
Camera looks down the half-court. Perspective: far baseline is smaller
(top of frame), near net line is wider (bottom of frame).
"""
# Near edge (bottom of frame, close to camera = at the net)
near_left = [w * 0.05, h * 0.85]
near_right = [w * 0.95, h * 0.85]
# Far edge (top of frame, baseline = far from camera)
far_left = [w * 0.20, h * 0.15]
far_right = [w * 0.80, h * 0.15]
# For camera at net looking at court half:
# - bottom of frame = net line (near)
# - top of frame = baseline (far)
# Order must match get_half_court_3d_points output
if side == 'left':
# 3D order: baseline near-left, baseline near-right, net far-right, net far-left
return np.array([far_left, far_right, near_right, near_left], dtype=np.float32)
else:
return np.array([far_left, far_right, near_right, near_left], dtype=np.float32)
def _capture_var_snapshot(frame, event):
"""Create a snapshot for VAR event and store it in state."""
# Draw VAR overlay on frame
snapshot = frame.copy()
cv2.putText(snapshot, "VAR DETECT", (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm",
(10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85])
b64 = base64.b64encode(jpeg.tobytes()).decode('ascii')
state['last_var'] = {
'event': event,
'snapshot_b64': b64,
}
def detection_loop(cam_readers, model, conf_threshold, ring_buffer):
"""Main detection loop: alternate cameras, run YOLO, update state."""
frame_counts = {sid: 0 for sid in cam_readers}
start_times = {sid: time.time() for sid in cam_readers}
trajectory = state['trajectory']
event_detector = state['event_detector']
calibrators = state['calibrators']
while True:
for sensor_id, reader in cam_readers.items():
cam = state['cameras'][sensor_id]
frame = reader.grab()
if frame is None:
continue
now = time.time()
# Store raw frame in ring buffer for VAR
ring_buffer.push(frame, now, sensor_id)
# YOLO detection
results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold)
det_count = 0
best_detection = None
best_conf = 0
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
label = f"Ball {conf:.0%}"
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1)
cv2.putText(frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
det_count += 1
if conf > best_conf:
best_conf = conf
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
diameter = max(x2 - x1, y2 - y1)
best_detection = (cx, cy, diameter, conf)
# Update trajectory if we have calibration and a detection
if best_detection and sensor_id in calibrators:
cal = calibrators[sensor_id]
if cal.calibrated:
px, py, diam, conf = best_detection
pos_3d = cal.pixel_to_3d(px, py, diam)
if pos_3d:
trajectory.add_observation(
pos_3d[0], pos_3d[1], pos_3d[2],
now, frame_counts[sensor_id], sensor_id, conf
)
# Check for close calls
event = event_detector.check(trajectory)
if event:
print(f"[VAR] Close call! Line: {event['line']}, "
f"Distance: {event['distance_m']*100:.0f}cm")
state['events'].append(event)
_capture_var_snapshot(frame, event)
# FPS tracking
frame_counts[sensor_id] += 1
elapsed = time.time() - start_times[sensor_id]
fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0
# HUD overlay
cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
if det_count > 0:
cv2.putText(frame, f"Balls: {det_count}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# Encode JPEG and update shared state
_, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
with cam['lock']:
cam['frame'] = jpeg.tobytes()
cam['fps'] = fps_actual
cam['detections'] = det_count
if frame_counts[sensor_id] % 150 == 0:
speed = trajectory.get_speed()
speed_str = f"{speed:.1f} m/s" if speed else "N/A"
print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, "
f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}")
def main():
global _cam_readers, _args
parser = argparse.ArgumentParser(description='Pickle Vision Referee System')
parser.add_argument('--width', type=int, default=1280)
parser.add_argument('--height', type=int, default=720)
parser.add_argument('--fps', type=int, default=30)
parser.add_argument('--model', type=str, default='yolov8n.pt')
parser.add_argument('--conf', type=float, default=0.25)
parser.add_argument('--port', type=int, default=8080)
parser.add_argument('--buffer-seconds', type=int, default=10,
help='Ring buffer size in seconds for VAR clips')
parser.add_argument('--calibration-dir', type=str,
default=os.path.join(os.path.dirname(__file__), 'config'),
help='Directory with calibration JSON files')
args = parser.parse_args()
_args = args
# Load YOLO model
print(f"Loading YOLO model: {args.model}")
model = YOLO(args.model)
try:
model.to("cuda")
print("Inference on CUDA")
except Exception:
print("CUDA unavailable, using CPU")
# Initialize shared state
state['trajectory'] = TrajectoryModel(fps=args.fps)
state['event_detector'] = EventDetector(trigger_distance_m=0.3)
state['calibration_dir'] = args.calibration_dir
state['calibrate_fn'] = auto_calibrate
ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps)
# Load calibrations if available
os.makedirs(args.calibration_dir, exist_ok=True)
for sensor_id in [0, 1]:
cal = CameraCalibrator()
cal_path = os.path.join(args.calibration_dir, f'cam{sensor_id}_calibration.json')
if os.path.exists(cal_path):
if cal.load(cal_path):
print(f"[CAM {sensor_id}] Loaded calibration from {cal_path}")
state['calibrators'][sensor_id] = cal
# Start camera readers
cam_readers = {}
for sensor_id in [0, 1]:
state['cameras'][sensor_id] = {
'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0
}
cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps)
_cam_readers = cam_readers
# Wait for at least one camera
print("Waiting for cameras...")
for _ in range(100):
if any(r.grab() is not None for r in cam_readers.values()):
break
time.sleep(0.1)
# Start detection loop
det_thread = threading.Thread(
target=detection_loop,
args=(cam_readers, model, args.conf, ring_buffer),
daemon=True
)
det_thread.start()
time.sleep(2)
print(f"\n{'=' * 50}")
print(f" Pickle Vision Referee System")
print(f" Open in browser: http://192.168.1.253:{args.port}")
print(f"{'=' * 50}\n")
app.run(host='0.0.0.0', port=args.port, threaded=True)
if __name__ == '__main__':
main()