Add referee system architecture: 3-tab web UI, physics, VAR detection
- src/calibration: 3D camera calibration (solvePnP, homography) - src/physics: trajectory model with gravity/drag, close call event detector - src/streaming: camera reader + ring buffer for VAR clips - src/web: Flask app with 3-tab UI (Detection, Court, Trajectory) + Three.js - jetson/main.py: unified entry point wiring all components Court tab: one-click calibration button, 3D scene with camera positions Trajectory tab: accumulated ball path, VAR panel with snapshot + timer Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
389
jetson/main.py
Normal file
389
jetson/main.py
Normal file
@@ -0,0 +1,389 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Pickle Vision - Referee System main entry point for Jetson.
|
||||
Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import cv2
|
||||
import time
|
||||
import base64
|
||||
import argparse
|
||||
import threading
|
||||
import numpy as np
|
||||
|
||||
# Add project root to path
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from ultralytics import YOLO
|
||||
from src.streaming.camera_reader import CameraReader
|
||||
from src.streaming.ring_buffer import FrameRingBuffer
|
||||
from src.physics.trajectory import TrajectoryModel
|
||||
from src.physics.event_detector import EventDetector
|
||||
from src.calibration.camera_calibrator import (
|
||||
CameraCalibrator, get_half_court_3d_points,
|
||||
COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH
|
||||
)
|
||||
from src.web.app import app, state
|
||||
|
||||
BALL_CLASS_ID = 32 # sports ball in COCO
|
||||
|
||||
# Global references set in main()
|
||||
_cam_readers = {}
|
||||
_args = None
|
||||
|
||||
|
||||
def auto_calibrate():
|
||||
"""One-click calibration: detect court lines from current frames,
|
||||
compute camera pose, save to config.
|
||||
|
||||
Each camera sees one half of the court from the net position.
|
||||
We use the court lines visible in each frame to build correspondences.
|
||||
|
||||
For now: use a simple approach — detect the 4 most prominent lines
|
||||
(baseline, two sidelines, kitchen line) and map to known 3D coords.
|
||||
Falls back to estimated corners based on frame geometry if detection fails.
|
||||
"""
|
||||
results = {}
|
||||
|
||||
for sensor_id, reader in _cam_readers.items():
|
||||
frame = reader.grab()
|
||||
if frame is None:
|
||||
results[str(sensor_id)] = {'ok': False, 'error': 'No frame available'}
|
||||
continue
|
||||
|
||||
h, w = frame.shape[:2]
|
||||
side = 'left' if sensor_id == 0 else 'right'
|
||||
|
||||
# Try to detect court lines using edge detection + Hough
|
||||
corners_pixel = _detect_court_corners(frame, side)
|
||||
|
||||
if corners_pixel is None:
|
||||
# Fallback: estimate corners from typical camera-at-net perspective
|
||||
corners_pixel = _estimate_corners_from_net(w, h, side)
|
||||
|
||||
# Get known 3D coordinates for this half
|
||||
corners_3d = get_half_court_3d_points(side)
|
||||
|
||||
# Calibrate
|
||||
cal = CameraCalibrator()
|
||||
ok = cal.calibrate(
|
||||
np.array(corners_pixel, dtype=np.float32),
|
||||
corners_3d,
|
||||
w, h
|
||||
)
|
||||
|
||||
if ok:
|
||||
# Save to config
|
||||
cal_path = os.path.join(_args.calibration_dir,
|
||||
f'cam{sensor_id}_calibration.json')
|
||||
os.makedirs(os.path.dirname(cal_path), exist_ok=True)
|
||||
cal.save(cal_path)
|
||||
|
||||
state['calibrators'][sensor_id] = cal
|
||||
|
||||
# Get camera position for 3D scene
|
||||
cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten()
|
||||
|
||||
results[str(sensor_id)] = {
|
||||
'ok': True,
|
||||
'camera_position': cam_pos.tolist(),
|
||||
'corners_pixel': corners_pixel.tolist() if isinstance(corners_pixel, np.ndarray)
|
||||
else corners_pixel,
|
||||
}
|
||||
print(f"[CAM {sensor_id}] Calibrated! Camera at "
|
||||
f"({cam_pos[0]:.1f}, {cam_pos[1]:.1f}, {cam_pos[2]:.1f})")
|
||||
else:
|
||||
results[str(sensor_id)] = {'ok': False, 'error': 'Calibration failed'}
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _detect_court_corners(frame, side):
|
||||
"""Detect court corners from frame using edge detection.
|
||||
|
||||
Returns 4 corner points as numpy array, or None if detection fails.
|
||||
"""
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
blur = cv2.GaussianBlur(gray, (5, 5), 0)
|
||||
edges = cv2.Canny(blur, 50, 150)
|
||||
|
||||
# Detect lines
|
||||
lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=80,
|
||||
minLineLength=100, maxLineGap=20)
|
||||
|
||||
if lines is None or len(lines) < 4:
|
||||
return None
|
||||
|
||||
# Classify lines into horizontal and vertical
|
||||
horizontals = []
|
||||
verticals = []
|
||||
|
||||
for line in lines:
|
||||
x1, y1, x2, y2 = line[0]
|
||||
angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi)
|
||||
|
||||
if angle < 30 or angle > 150:
|
||||
horizontals.append(line[0])
|
||||
elif 60 < angle < 120:
|
||||
verticals.append(line[0])
|
||||
|
||||
if len(horizontals) < 2 or len(verticals) < 2:
|
||||
return None
|
||||
|
||||
# Cluster lines by position to find the dominant ones
|
||||
h_positions = sorted(horizontals, key=lambda l: (l[1] + l[3]) / 2)
|
||||
v_positions = sorted(verticals, key=lambda l: (l[0] + l[2]) / 2)
|
||||
|
||||
# Take the most separated horizontal pair (top and bottom court lines)
|
||||
top_line = h_positions[0]
|
||||
bottom_line = h_positions[-1]
|
||||
|
||||
# Take the most separated vertical pair (left and right sidelines)
|
||||
left_line = v_positions[0]
|
||||
right_line = v_positions[-1]
|
||||
|
||||
# Find intersections as corner points
|
||||
def line_intersection(l1, l2):
|
||||
x1, y1, x2, y2 = l1
|
||||
x3, y3, x4, y4 = l2
|
||||
denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
|
||||
if abs(denom) < 1e-6:
|
||||
return None
|
||||
t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom
|
||||
ix = x1 + t * (x2 - x1)
|
||||
iy = y1 + t * (y2 - y1)
|
||||
return [ix, iy]
|
||||
|
||||
corners = [
|
||||
line_intersection(top_line, left_line), # TL
|
||||
line_intersection(top_line, right_line), # TR
|
||||
line_intersection(bottom_line, right_line), # BR
|
||||
line_intersection(bottom_line, left_line), # BL
|
||||
]
|
||||
|
||||
if any(c is None for c in corners):
|
||||
return None
|
||||
|
||||
return np.array(corners, dtype=np.float32)
|
||||
|
||||
|
||||
def _estimate_corners_from_net(w, h, side):
|
||||
"""Estimate court corner positions for a camera mounted at the net.
|
||||
|
||||
Camera looks down the half-court. Perspective: far baseline is smaller
|
||||
(top of frame), near net line is wider (bottom of frame).
|
||||
"""
|
||||
# Near edge (bottom of frame, close to camera = at the net)
|
||||
near_left = [w * 0.05, h * 0.85]
|
||||
near_right = [w * 0.95, h * 0.85]
|
||||
|
||||
# Far edge (top of frame, baseline = far from camera)
|
||||
far_left = [w * 0.20, h * 0.15]
|
||||
far_right = [w * 0.80, h * 0.15]
|
||||
|
||||
# For camera at net looking at court half:
|
||||
# - bottom of frame = net line (near)
|
||||
# - top of frame = baseline (far)
|
||||
# Order must match get_half_court_3d_points output
|
||||
if side == 'left':
|
||||
# 3D order: baseline near-left, baseline near-right, net far-right, net far-left
|
||||
return np.array([far_left, far_right, near_right, near_left], dtype=np.float32)
|
||||
else:
|
||||
return np.array([far_left, far_right, near_right, near_left], dtype=np.float32)
|
||||
|
||||
|
||||
def _capture_var_snapshot(frame, event):
|
||||
"""Create a snapshot for VAR event and store it in state."""
|
||||
# Draw VAR overlay on frame
|
||||
snapshot = frame.copy()
|
||||
cv2.putText(snapshot, "VAR DETECT", (10, 40),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
|
||||
cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm",
|
||||
(10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
|
||||
|
||||
_, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
||||
b64 = base64.b64encode(jpeg.tobytes()).decode('ascii')
|
||||
|
||||
state['last_var'] = {
|
||||
'event': event,
|
||||
'snapshot_b64': b64,
|
||||
}
|
||||
|
||||
|
||||
def detection_loop(cam_readers, model, conf_threshold, ring_buffer):
|
||||
"""Main detection loop: alternate cameras, run YOLO, update state."""
|
||||
frame_counts = {sid: 0 for sid in cam_readers}
|
||||
start_times = {sid: time.time() for sid in cam_readers}
|
||||
trajectory = state['trajectory']
|
||||
event_detector = state['event_detector']
|
||||
calibrators = state['calibrators']
|
||||
|
||||
while True:
|
||||
for sensor_id, reader in cam_readers.items():
|
||||
cam = state['cameras'][sensor_id]
|
||||
|
||||
frame = reader.grab()
|
||||
if frame is None:
|
||||
continue
|
||||
|
||||
now = time.time()
|
||||
|
||||
# Store raw frame in ring buffer for VAR
|
||||
ring_buffer.push(frame, now, sensor_id)
|
||||
|
||||
# YOLO detection
|
||||
results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold)
|
||||
|
||||
det_count = 0
|
||||
best_detection = None
|
||||
best_conf = 0
|
||||
|
||||
for r in results:
|
||||
for box in r.boxes:
|
||||
x1, y1, x2, y2 = map(int, box.xyxy[0])
|
||||
conf = float(box.conf[0])
|
||||
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
|
||||
label = f"Ball {conf:.0%}"
|
||||
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
|
||||
cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1)
|
||||
cv2.putText(frame, label, (x1, y1 - 5),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
|
||||
det_count += 1
|
||||
|
||||
if conf > best_conf:
|
||||
best_conf = conf
|
||||
cx = (x1 + x2) / 2
|
||||
cy = (y1 + y2) / 2
|
||||
diameter = max(x2 - x1, y2 - y1)
|
||||
best_detection = (cx, cy, diameter, conf)
|
||||
|
||||
# Update trajectory if we have calibration and a detection
|
||||
if best_detection and sensor_id in calibrators:
|
||||
cal = calibrators[sensor_id]
|
||||
if cal.calibrated:
|
||||
px, py, diam, conf = best_detection
|
||||
pos_3d = cal.pixel_to_3d(px, py, diam)
|
||||
if pos_3d:
|
||||
trajectory.add_observation(
|
||||
pos_3d[0], pos_3d[1], pos_3d[2],
|
||||
now, frame_counts[sensor_id], sensor_id, conf
|
||||
)
|
||||
|
||||
# Check for close calls
|
||||
event = event_detector.check(trajectory)
|
||||
if event:
|
||||
print(f"[VAR] Close call! Line: {event['line']}, "
|
||||
f"Distance: {event['distance_m']*100:.0f}cm")
|
||||
state['events'].append(event)
|
||||
_capture_var_snapshot(frame, event)
|
||||
|
||||
# FPS tracking
|
||||
frame_counts[sensor_id] += 1
|
||||
elapsed = time.time() - start_times[sensor_id]
|
||||
fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0
|
||||
|
||||
# HUD overlay
|
||||
cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
|
||||
if det_count > 0:
|
||||
cv2.putText(frame, f"Balls: {det_count}", (10, 60),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
|
||||
|
||||
# Encode JPEG and update shared state
|
||||
_, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
|
||||
with cam['lock']:
|
||||
cam['frame'] = jpeg.tobytes()
|
||||
cam['fps'] = fps_actual
|
||||
cam['detections'] = det_count
|
||||
|
||||
if frame_counts[sensor_id] % 150 == 0:
|
||||
speed = trajectory.get_speed()
|
||||
speed_str = f"{speed:.1f} m/s" if speed else "N/A"
|
||||
print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, "
|
||||
f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}")
|
||||
|
||||
|
||||
def main():
|
||||
global _cam_readers, _args
|
||||
|
||||
parser = argparse.ArgumentParser(description='Pickle Vision Referee System')
|
||||
parser.add_argument('--width', type=int, default=1280)
|
||||
parser.add_argument('--height', type=int, default=720)
|
||||
parser.add_argument('--fps', type=int, default=30)
|
||||
parser.add_argument('--model', type=str, default='yolov8n.pt')
|
||||
parser.add_argument('--conf', type=float, default=0.25)
|
||||
parser.add_argument('--port', type=int, default=8080)
|
||||
parser.add_argument('--buffer-seconds', type=int, default=10,
|
||||
help='Ring buffer size in seconds for VAR clips')
|
||||
parser.add_argument('--calibration-dir', type=str,
|
||||
default=os.path.join(os.path.dirname(__file__), 'config'),
|
||||
help='Directory with calibration JSON files')
|
||||
args = parser.parse_args()
|
||||
_args = args
|
||||
|
||||
# Load YOLO model
|
||||
print(f"Loading YOLO model: {args.model}")
|
||||
model = YOLO(args.model)
|
||||
try:
|
||||
model.to("cuda")
|
||||
print("Inference on CUDA")
|
||||
except Exception:
|
||||
print("CUDA unavailable, using CPU")
|
||||
|
||||
# Initialize shared state
|
||||
state['trajectory'] = TrajectoryModel(fps=args.fps)
|
||||
state['event_detector'] = EventDetector(trigger_distance_m=0.3)
|
||||
state['calibration_dir'] = args.calibration_dir
|
||||
state['calibrate_fn'] = auto_calibrate
|
||||
|
||||
ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps)
|
||||
|
||||
# Load calibrations if available
|
||||
os.makedirs(args.calibration_dir, exist_ok=True)
|
||||
for sensor_id in [0, 1]:
|
||||
cal = CameraCalibrator()
|
||||
cal_path = os.path.join(args.calibration_dir, f'cam{sensor_id}_calibration.json')
|
||||
if os.path.exists(cal_path):
|
||||
if cal.load(cal_path):
|
||||
print(f"[CAM {sensor_id}] Loaded calibration from {cal_path}")
|
||||
state['calibrators'][sensor_id] = cal
|
||||
|
||||
# Start camera readers
|
||||
cam_readers = {}
|
||||
for sensor_id in [0, 1]:
|
||||
state['cameras'][sensor_id] = {
|
||||
'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0
|
||||
}
|
||||
cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps)
|
||||
|
||||
_cam_readers = cam_readers
|
||||
|
||||
# Wait for at least one camera
|
||||
print("Waiting for cameras...")
|
||||
for _ in range(100):
|
||||
if any(r.grab() is not None for r in cam_readers.values()):
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
# Start detection loop
|
||||
det_thread = threading.Thread(
|
||||
target=detection_loop,
|
||||
args=(cam_readers, model, args.conf, ring_buffer),
|
||||
daemon=True
|
||||
)
|
||||
det_thread.start()
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
print(f"\n{'=' * 50}")
|
||||
print(f" Pickle Vision Referee System")
|
||||
print(f" Open in browser: http://192.168.1.253:{args.port}")
|
||||
print(f"{'=' * 50}\n")
|
||||
|
||||
app.run(host='0.0.0.0', port=args.port, threaded=True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user