Files
pickle_vision/jetson/main.py

1046 lines
41 KiB
Python

#!/usr/bin/env python3
"""
Pickle Vision - Referee System main entry point for Jetson.
Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers.
"""
import sys
import os
import cv2
import time
import base64
import json
import argparse
import threading
import numpy as np
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ultralytics import YOLO
from src.streaming.camera_reader import CameraReader
from src.streaming.ring_buffer import FrameRingBuffer
from src.physics.trajectory import TrajectoryModel
from src.physics.event_detector import EventDetector
from src.calibration.camera_calibrator import (
CameraCalibrator, get_half_court_3d_points, get_half_court_intersections,
COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH
)
from src.web.app import app, state
BALL_CLASS_ID = 32 # sports ball in COCO
# Global references set in main()
_cam_readers = {}
_args = None
def _init_calibration_steps():
"""Initialize ordered calibration step status map for UI."""
return {
'green_mask': {'status': 'pending', 'detail': 'Not started'},
'line_segments': {'status': 'pending', 'detail': 'Not started'},
'merged_lines': {'status': 'pending', 'detail': 'Not started'},
'intersections': {'status': 'pending', 'detail': 'Not started'},
'template_match': {'status': 'pending', 'detail': 'Not started'},
'camera_pose': {'status': 'pending', 'detail': 'Not started'},
'overlay': {'status': 'pending', 'detail': 'Not started'},
}
def _build_point_report(side, matched_image_points=None):
"""Build report of expected template points and which were matched."""
template = get_half_court_intersections(side)
matched_image_points = matched_image_points or {}
points = []
found_count = 0
for name, world_pt in template.items():
img_pt = matched_image_points.get(name)
is_found = img_pt is not None
if is_found:
found_count += 1
points.append({
'name': name,
'found': is_found,
'world_xyz': [float(world_pt[0]), float(world_pt[1]), float(world_pt[2])],
'image_xy': [float(img_pt[0]), float(img_pt[1])] if is_found else None,
})
target_count = len(points)
return {
'target_count': target_count,
'found_count': found_count,
'missing_count': target_count - found_count,
'points': points,
}
def _render_point_schematic(side, point_report):
"""Render flat half-court scheme with found/missing points."""
width = 760
height = 460
margin = 48
canvas = np.zeros((height, width, 3), dtype=np.uint8)
canvas[:] = (12, 18, 32)
# Half-court dimensions
half_len = HALF_COURT_LENGTH # 6.7m
court_w = COURT_WIDTH # 6.1m
kitchen_x = HALF_COURT_LENGTH - 2.13 # 4.57m from baseline
draw_w = width - 2 * margin
draw_h = height - 2 * margin
def to_px(local_x, y):
px = int(margin + (local_x / half_len) * draw_w)
py = int(margin + (y / court_w) * draw_h)
return px, py
# Court frame
tl = to_px(0, 0)
br = to_px(half_len, court_w)
cv2.rectangle(canvas, tl, br, (200, 200, 200), 2)
# Kitchen and center service line
p1 = to_px(kitchen_x, 0)
p2 = to_px(kitchen_x, court_w)
cv2.line(canvas, p1, p2, (180, 180, 180), 1)
c1 = to_px(0, court_w / 2)
c2 = to_px(kitchen_x, court_w / 2)
cv2.line(canvas, c1, c2, (140, 140, 140), 1)
# Net line
n1 = to_px(half_len, 0)
n2 = to_px(half_len, court_w)
cv2.line(canvas, n1, n2, (100, 160, 240), 2)
# Helper: convert world X to local baseline->net axis for each side
def to_local_x(world_x):
if side == 'left':
return world_x
return COURT_LENGTH - world_x
for point in point_report.get('points', []):
world = point.get('world_xyz') or [0.0, 0.0, 0.0]
local_x = to_local_x(world[0])
local_y = world[1]
px, py = to_px(local_x, local_y)
color = (70, 220, 130) if point.get('found') else (80, 80, 230)
cv2.circle(canvas, (px, py), 8, color, -1)
cv2.circle(canvas, (px, py), 12, color, 1)
cv2.putText(canvas, point.get('name', '?'), (px + 10, py - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.42, color, 1)
summary = f"Target: {point_report.get('target_count', 0)} Found: {point_report.get('found_count', 0)} Missing: {point_report.get('missing_count', 0)}"
cv2.putText(canvas, summary, (margin, height - 14),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (220, 220, 220), 1)
ok, jpg = cv2.imencode('.jpg', canvas, [cv2.IMWRITE_JPEG_QUALITY, 90])
if not ok:
return None
return base64.b64encode(jpg.tobytes()).decode('ascii')
def _render_points_on_frame(frame, point_report):
"""Render only detected template points on top of the camera frame."""
overlay = frame.copy()
found_count = 0
for point in point_report.get('points', []):
img_pt = point.get('image_xy')
if not img_pt:
continue
found_count += 1
x = int(img_pt[0])
y = int(img_pt[1])
cv2.circle(overlay, (x, y), 9, (70, 220, 130), 2)
cv2.circle(overlay, (x, y), 3, (70, 220, 130), -1)
cv2.putText(overlay, point.get('name', '?'), (x + 12, y - 6),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, (70, 220, 130), 1)
title = f"Template points: {found_count}/{point_report.get('target_count', 0)}"
cv2.rectangle(overlay, (8, 8), (360, 36), (0, 0, 0), -1)
cv2.putText(overlay, title, (14, 28),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 255, 220), 1)
ok, jpg = cv2.imencode('.jpg', overlay, [cv2.IMWRITE_JPEG_QUALITY, 88])
if not ok:
return None
return base64.b64encode(jpg.tobytes()).decode('ascii')
def _save_calibration_report(report_payload):
"""Persist last calibration report so it can be read outside UI."""
if not _args or not getattr(_args, 'calibration_dir', None):
return
report_dir = os.path.join(_args.calibration_dir, 'calibration_reports')
os.makedirs(report_dir, exist_ok=True)
# Save compact JSON without large base64 payloads.
compact = {
'timestamp': report_payload.get('timestamp'),
'result': {},
}
for sid, entry in (report_payload.get('result') or {}).items():
clean = dict(entry)
clean.pop('debug_image', None)
clean.pop('points_overlay_image', None)
clean.pop('schematic_image', None)
compact['result'][sid] = clean
latest_path = os.path.join(report_dir, 'latest.json')
ts_path = os.path.join(report_dir, f"report_{int(time.time())}.json")
for path in [latest_path, ts_path]:
try:
with open(path, 'w') as f:
json.dump(compact, f, indent=2)
except Exception as exc:
print(f"[CALIBRATION] Failed to write report {path}: {exc}")
def auto_calibrate():
"""Calibrate cameras by detecting court line intersections and matching
them to the known pickleball court template.
Algorithm:
1. Detect green court surface (search area)
2. Find white line segments on the green surface
3. Merge similar segments into distinct court lines
4. Find intersections between lines
5. Use green quad for initial homography estimate
6. Project template intersection points, match to detected ones
7. Try both left-right mirror mappings, pick sane camera position
8. Refine with solvePnP using all matched correspondences
"""
results = {}
for sensor_id, reader in _cam_readers.items():
side = 'left' if sensor_id == 0 else 'right'
steps = _init_calibration_steps()
frame = reader.grab()
if frame is None:
point_report = _build_point_report(side)
steps['green_mask'] = {'status': 'fail', 'detail': 'No frame available'}
steps['line_segments'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['merged_lines'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['intersections'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['template_match'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['camera_pose'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['overlay'] = {'status': 'fail', 'detail': 'Calibration aborted'}
results[str(sensor_id)] = {
'ok': False,
'error': f'CAM {sensor_id}: No frame available',
'steps': steps,
'point_report': point_report,
'schematic_image': _render_point_schematic(side, point_report),
'points_overlay_image': None,
}
continue
h, w = frame.shape[:2]
debug_frame = frame.copy()
# Step 1: Detect green court mask
green_mask = _detect_green_mask(frame)
green_overlay = np.zeros_like(debug_frame)
green_overlay[green_mask > 0] = (0, 80, 0)
debug_frame = cv2.addWeighted(debug_frame, 1.0, green_overlay, 0.3, 0)
green_pct = np.count_nonzero(green_mask) / (w * h) * 100
if green_pct < 5:
point_report = _build_point_report(side)
steps['green_mask'] = {'status': 'fail', 'detail': f'Green area too small: {green_pct:.1f}%'}
steps['line_segments'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['merged_lines'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['intersections'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['template_match'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['camera_pose'] = {'status': 'fail', 'detail': 'Calibration aborted'}
steps['overlay'] = {'status': 'fail', 'detail': 'Calibration aborted'}
cv2.putText(debug_frame, f"FAILED: Green area too small ({green_pct:.0f}%)",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
results[str(sensor_id)] = {
'ok': False,
'error': f'CAM {sensor_id}: Green area too small ({green_pct:.0f}%)',
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
'steps': steps,
'point_report': point_report,
'schematic_image': _render_point_schematic(side, point_report),
'points_overlay_image': _render_points_on_frame(debug_frame, point_report),
}
continue
steps['green_mask'] = {'status': 'ok', 'detail': f'Green area: {green_pct:.1f}%'}
# Step 2: Detect white line segments on court
white_segments = _detect_white_lines_on_court(frame, green_mask)
for x1, y1, x2, y2 in white_segments:
cv2.line(debug_frame, (x1, y1), (x2, y2), (255, 255, 0), 1)
if white_segments:
steps['line_segments'] = {'status': 'ok', 'detail': f'{len(white_segments)} segments detected'}
else:
steps['line_segments'] = {'status': 'fail', 'detail': '0 segments detected'}
# Step 3: Merge into distinct lines
merged = _merge_line_segments(white_segments)
for m in merged:
p1, p2 = m['p1'], m['p2']
cv2.line(debug_frame, (int(p1[0]), int(p1[1])),
(int(p2[0]), int(p2[1])), (0, 165, 255), 2)
if merged:
steps['merged_lines'] = {'status': 'ok', 'detail': f'{len(merged)} merged lines'}
else:
steps['merged_lines'] = {'status': 'fail', 'detail': '0 merged lines'}
# Step 4: Find intersections
intersections = _find_line_intersections(merged, w, h)
for ix, iy in intersections:
cv2.circle(debug_frame, (int(ix), int(iy)), 6, (0, 0, 255), -1)
if intersections:
steps['intersections'] = {'status': 'ok', 'detail': f'{len(intersections)} intersections'}
else:
steps['intersections'] = {'status': 'fail', 'detail': '0 intersections'}
cv2.putText(debug_frame,
f"{len(white_segments)} segs -> {len(merged)} lines -> {len(intersections)} pts",
(10, h - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
template = get_half_court_intersections(side)
quad = _find_green_quad(green_mask, w * h)
if len(intersections) < 4:
# Fallback: try green quad + both mappings
if quad is not None:
cal, cam_pos, mapping_info = _calibrate_from_quad(quad, side, w, h)
if cal is not None:
point_report = _build_point_report(side)
steps['template_match'] = {
'status': 'ok',
'detail': 'Low intersections, used quad fallback mapping'
}
_finalize_calibration(
cal=cal,
cam_pos=cam_pos,
sensor_id=sensor_id,
side=side,
debug_frame=debug_frame,
results=results,
method_info=mapping_info,
steps=steps,
n_segments=len(white_segments),
n_lines=len(merged),
n_intersections=len(intersections),
n_points_matched=0,
point_report=point_report,
)
continue
point_report = _build_point_report(side)
steps['template_match'] = {'status': 'fail', 'detail': 'Not enough intersections for template fit'}
steps['camera_pose'] = {'status': 'fail', 'detail': 'solvePnP skipped'}
steps['overlay'] = {'status': 'fail', 'detail': 'No calibration output'}
cv2.putText(debug_frame, f"FAILED: Need 4+ intersections, got {len(intersections)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
results[str(sensor_id)] = {
'ok': False,
'error': f'CAM {sensor_id}: {len(intersections)} intersections (need 4+) from {len(merged)} lines',
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
'steps': steps,
'point_report': point_report,
'schematic_image': _render_point_schematic(side, point_report),
'points_overlay_image': _render_points_on_frame(debug_frame, point_report),
}
continue
# Step 5: Match intersections to court template
match = _match_intersections_to_template(
intersections, template, side, quad, w, h
)
if match is None or len(match['image_points']) < 4:
# Fallback to quad-only calibration
if quad is not None:
cal, cam_pos, mapping_info = _calibrate_from_quad(quad, side, w, h)
if cal is not None:
point_report = _build_point_report(side)
steps['template_match'] = {
'status': 'ok',
'detail': 'Template match weak, used quad fallback mapping'
}
_finalize_calibration(
cal=cal,
cam_pos=cam_pos,
sensor_id=sensor_id,
side=side,
debug_frame=debug_frame,
results=results,
method_info=mapping_info,
steps=steps,
n_segments=len(white_segments),
n_lines=len(merged),
n_intersections=len(intersections),
n_points_matched=0,
point_report=point_report,
)
continue
point_report = _build_point_report(side)
steps['template_match'] = {'status': 'fail', 'detail': 'Could not match intersections to template'}
steps['camera_pose'] = {'status': 'fail', 'detail': 'solvePnP skipped'}
steps['overlay'] = {'status': 'fail', 'detail': 'No calibration output'}
cv2.putText(debug_frame, "FAILED: Could not match court pattern",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
results[str(sensor_id)] = {
'ok': False,
'error': f'CAM {sensor_id}: Could not match court pattern',
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
'steps': steps,
'point_report': point_report,
'schematic_image': _render_point_schematic(side, point_report),
'points_overlay_image': _render_points_on_frame(debug_frame, point_report),
}
continue
n_matched = len(match['image_points'])
point_report = _build_point_report(side, match['image_points'])
steps['template_match'] = {
'status': 'ok',
'detail': f'Matched {n_matched}/{len(template)} template points'
}
# Draw matched points
for name, (px, py) in match['image_points'].items():
cv2.circle(debug_frame, (int(px), int(py)), 10, (0, 255, 0), 2)
cv2.putText(debug_frame, name, (int(px) + 12, int(py) - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1)
# Step 6: solvePnP with matched correspondences
pts_2d = np.array(list(match['image_points'].values()), dtype=np.float32)
pts_3d = np.array(list(match['world_points'].values()), dtype=np.float32)
cal = CameraCalibrator()
try:
cal.calibrate(pts_2d, pts_3d, w, h)
except RuntimeError as exc:
steps['camera_pose'] = {'status': 'fail', 'detail': str(exc)}
steps['overlay'] = {'status': 'fail', 'detail': 'No calibration output'}
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
results[str(sensor_id)] = {
'ok': False,
'error': f'CAM {sensor_id}: solvePnP failed ({exc})',
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
'steps': steps,
'point_report': point_report,
'schematic_image': _render_point_schematic(side, point_report),
'points_overlay_image': _render_points_on_frame(debug_frame, point_report),
}
continue
cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten()
method_info = f"{n_matched} intersections matched"
# Sanity check
if cam_pos[2] < 0 or cam_pos[2] > 10:
cv2.putText(debug_frame, f"BAD Z={cam_pos[2]:.1f}m, trying quad fallback",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
if quad is not None:
cal2, cam_pos2, info = _calibrate_from_quad(quad, side, w, h)
if cal2 is not None:
cal, cam_pos = cal2, cam_pos2
method_info = f"{method_info}; fallback={info}"
_finalize_calibration(
cal=cal,
cam_pos=cam_pos,
sensor_id=sensor_id,
side=side,
debug_frame=debug_frame,
results=results,
method_info=method_info,
steps=steps,
n_segments=len(white_segments),
n_lines=len(merged),
n_intersections=len(intersections),
n_points_matched=n_matched,
point_report=point_report,
)
report_payload = {
'timestamp': time.time(),
'result': results,
}
state['last_calibration_report'] = report_payload
_save_calibration_report(report_payload)
return results
def _finalize_calibration(cal, cam_pos, sensor_id, side, debug_frame,
results, method_info, steps, n_segments, n_lines,
n_intersections, n_points_matched, point_report):
"""Save calibration and build result dict."""
steps['camera_pose'] = {
'status': 'ok',
'detail': f'Camera pose solved: X={cam_pos[0]:.2f}, Y={cam_pos[1]:.2f}, Z={cam_pos[2]:.2f}m'
}
steps['overlay'] = {
'status': 'ok',
'detail': f'Geometry projected with {n_points_matched} matched points'
}
cv2.putText(debug_frame,
f"CAM{sensor_id}: X={cam_pos[0]:.2f} Y={cam_pos[1]:.2f} Z={cam_pos[2]:.2f}m",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(debug_frame,
f"{side} half | {method_info}",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
cal_path = os.path.join(_args.calibration_dir,
f'cam{sensor_id}_calibration.json')
os.makedirs(os.path.dirname(cal_path), exist_ok=True)
cal.save(cal_path)
state['calibrators'][sensor_id] = cal
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
matched_lines_3d = _get_half_court_lines_3d(side)
schematic_image = _render_point_schematic(side, point_report)
points_overlay_image = _render_points_on_frame(debug_frame, point_report)
results[str(sensor_id)] = {
'ok': True,
'camera_position': cam_pos.tolist(),
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
'points_overlay_image': points_overlay_image,
'schematic_image': schematic_image,
'point_report': point_report,
'matched_lines_3d': matched_lines_3d,
'points_matched': n_points_matched,
'intersections_detected': n_intersections,
'lines_detected': n_lines,
'segments_detected': n_segments,
'method': method_info,
'steps': steps,
}
print(f"[CAM {sensor_id}] Calibrated! Camera at "
f"({cam_pos[0]:.2f}, {cam_pos[1]:.2f}, {cam_pos[2]:.2f}) via {method_info}")
def _get_half_court_lines_3d(side):
"""Return 3D line segments for all lines of a half-court."""
if side == 'left':
bx, kx = 0, 4.57
else:
bx, kx = 13.4, 8.83
return [
{'name': 'baseline', 'from': [bx, 0, 0], 'to': [bx, 6.1, 0]},
{'name': 'kitchen', 'from': [kx, 0, 0], 'to': [kx, 6.1, 0]},
{'name': 'sideline_near', 'from': [bx, 0, 0], 'to': [kx, 0, 0]},
{'name': 'sideline_far', 'from': [bx, 6.1, 0], 'to': [kx, 6.1, 0]},
{'name': 'center_service', 'from': [bx, 3.05, 0], 'to': [kx, 3.05, 0]},
]
def _detect_green_mask(frame):
"""Detect green court surface pixels. Returns binary mask."""
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
lower_green = np.array([30, 30, 50])
upper_green = np.array([90, 255, 220])
mask = cv2.inRange(hsv, lower_green, upper_green)
kernel = np.ones((7, 7), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
return mask
def _find_green_quad(green_mask, frame_area):
"""Find the boundary quadrilateral of the green court surface.
Returns 4x2 float32 array ordered TL, TR, BR, BL, or None."""
contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None
largest = max(contours, key=cv2.contourArea)
area = cv2.contourArea(largest)
if area < frame_area * 0.05:
return None
epsilon = 0.02 * cv2.arcLength(largest, True)
approx = cv2.approxPolyDP(largest, epsilon, True)
if len(approx) == 4:
quad = approx.reshape(4, 2).astype(np.float32)
else:
rect = cv2.minAreaRect(largest)
quad = cv2.boxPoints(rect).astype(np.float32)
# Order: TL, TR, BR, BL
sorted_by_y = quad[np.argsort(quad[:, 1])]
top_pair = sorted_by_y[:2][np.argsort(sorted_by_y[:2, 0])]
bottom_pair = sorted_by_y[2:][np.argsort(sorted_by_y[2:, 0])]
return np.array([top_pair[0], top_pair[1], bottom_pair[1], bottom_pair[0]],
dtype=np.float32)
def _calibrate_from_quad(quad, side, w, h):
"""Try calibrating from green quad boundary by testing both left-right mappings.
The quad is ordered TL, TR, BR, BL in image coordinates.
Camera is at net looking at one half-court, so:
- Top of image = far from camera = baseline
- Bottom of image = near camera = kitchen/net area
We don't know which sideline is left vs right, so try both.
Returns (CameraCalibrator, cam_pos, info_string) or (None, None, None).
"""
corners_3d = get_half_court_3d_points(side)
# corners_3d order: [baseline_near, baseline_far, net_far, net_near]
# baseline_near = (bx, 0, 0), baseline_far = (bx, 6.1, 0)
# net_far = (nx, 6.1, 0), net_near = (nx, 0, 0)
# Mapping A: image-left = y=0 (near sideline), image-right = y=6.1 (far sideline)
# quad: TL→baseline_near, TR→baseline_far, BR→net_far, BL→net_near
mapping_a = corners_3d[[0, 1, 2, 3]]
# Mapping B: image-left = y=6.1 (far sideline), image-right = y=0 (near sideline)
# quad: TL→baseline_far, TR→baseline_near, BR→net_near, BL→net_far
mapping_b = corners_3d[[1, 0, 3, 2]]
best_cal = None
best_pos = None
best_label = None
for label, mapping in [('A', mapping_a), ('B', mapping_b)]:
cal = CameraCalibrator()
try:
cal.calibrate(quad, mapping, w, h)
except RuntimeError:
continue
cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten()
# Camera should be: above ground (z>0), reasonable height (z<5m)
if 0 < cam_pos[2] < 5:
if best_cal is None or abs(cam_pos[2] - 1.0) < abs(best_pos[2] - 1.0):
best_cal = cal
best_pos = cam_pos
best_label = label
if best_cal is None:
return None, None, None
return best_cal, best_pos, f"quad mapping {best_label}"
def _detect_white_lines_on_court(frame, green_mask):
"""Detect white line segments within the green court area.
Returns list of [x1, y1, x2, y2] segments."""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
_, white = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY)
# Only keep white pixels within/near the court
dilated_court = cv2.dilate(green_mask, np.ones((15, 15), np.uint8))
white_on_court = cv2.bitwise_and(white, dilated_court)
lines = cv2.HoughLinesP(white_on_court, 1, np.pi / 180, threshold=40,
minLineLength=50, maxLineGap=25)
if lines is None:
return []
return [line[0].tolist() for line in lines]
def _merge_line_segments(segments, angle_thresh=15, dist_thresh=30):
"""Merge nearby parallel line segments into distinct court lines.
Groups segments by angle and perpendicular distance, then represents
each group by the longest extent along the dominant direction.
Returns list of dicts with 'angle', 'p1', 'p2', 'weight'.
"""
if not segments:
return []
line_data = []
for x1, y1, x2, y2 in segments:
angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi
if angle < 0:
angle += 180
length = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
mid = ((x1 + x2) / 2, (y1 + y2) / 2)
line_data.append({
'angle': angle, 'mid': mid,
'seg': (x1, y1, x2, y2), 'length': length,
})
used = [False] * len(line_data)
merged = []
for i in range(len(line_data)):
if used[i]:
continue
cluster = [line_data[i]]
used[i] = True
for j in range(i + 1, len(line_data)):
if used[j]:
continue
da = abs(line_data[i]['angle'] - line_data[j]['angle'])
if da > 90:
da = 180 - da
if da > angle_thresh:
continue
# Perpendicular distance between midpoints
dx = line_data[j]['mid'][0] - line_data[i]['mid'][0]
dy = line_data[j]['mid'][1] - line_data[i]['mid'][1]
angle_rad = line_data[i]['angle'] * np.pi / 180
perp_dist = abs(dx * np.sin(angle_rad) - dy * np.cos(angle_rad))
if perp_dist < dist_thresh:
cluster.append(line_data[j])
used[j] = True
# Merge: use weighted average angle, find extent from all endpoints
total_weight = sum(l['length'] for l in cluster)
avg_angle = sum(l['angle'] * l['length'] for l in cluster) / total_weight
angle_rad = avg_angle * np.pi / 180
dx, dy = np.cos(angle_rad), np.sin(angle_rad)
all_pts = []
for l in cluster:
sx1, sy1, sx2, sy2 = l['seg']
all_pts.extend([(sx1, sy1), (sx2, sy2)])
projections = [p[0] * dx + p[1] * dy for p in all_pts]
min_idx = int(np.argmin(projections))
max_idx = int(np.argmax(projections))
merged.append({
'angle': avg_angle,
'p1': all_pts[min_idx],
'p2': all_pts[max_idx],
'weight': total_weight,
})
return merged
def _find_line_intersections(merged_lines, img_w, img_h, margin=50):
"""Find intersection points between merged lines.
Only keeps intersections where lines are sufficiently non-parallel
and the point is within (or near) the image bounds.
Clusters nearby intersections into single points.
"""
raw_intersections = []
for i in range(len(merged_lines)):
for j in range(i + 1, len(merged_lines)):
da = abs(merged_lines[i]['angle'] - merged_lines[j]['angle'])
if da > 90:
da = 180 - da
if da < 20: # too parallel
continue
x1, y1 = merged_lines[i]['p1']
x2, y2 = merged_lines[i]['p2']
x3, y3 = merged_lines[j]['p1']
x4, y4 = merged_lines[j]['p2']
denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
if abs(denom) < 1e-6:
continue
t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom
ix = x1 + t * (x2 - x1)
iy = y1 + t * (y2 - y1)
if -margin <= ix <= img_w + margin and -margin <= iy <= img_h + margin:
raw_intersections.append((ix, iy))
# Cluster nearby intersections (within 20px)
if not raw_intersections:
return []
pts = np.array(raw_intersections, dtype=np.float32)
used = [False] * len(pts)
clustered = []
for i in range(len(pts)):
if used[i]:
continue
cluster = [pts[i]]
used[i] = True
for j in range(i + 1, len(pts)):
if used[j]:
continue
if np.linalg.norm(pts[i] - pts[j]) < 20:
cluster.append(pts[j])
used[j] = True
centroid = np.mean(cluster, axis=0)
clustered.append((float(centroid[0]), float(centroid[1])))
return clustered
def _match_intersections_to_template(intersections, template, side, quad, w, h):
"""Match detected intersections to known court template points.
Strategy:
1. Use green quad to compute initial homography (world -> image)
2. Project all 6 template points into image
3. For each projected point, find nearest detected intersection
4. Try both left-right mirror mappings
5. Return the match with more inliers and sane reprojection
Returns dict with 'image_points' and 'world_points' dicts, or None.
"""
if quad is None or len(intersections) < 4:
return None
corners_3d = get_half_court_3d_points(side)
det_pts = np.array(intersections, dtype=np.float32)
template_names = list(template.keys())
template_world = np.array([template[n][:2] for n in template_names], dtype=np.float32)
best_match = None
best_count = 0
# Try both left-right mappings for the initial homography
for mapping_3d in [corners_3d[[0, 1, 2, 3]], corners_3d[[1, 0, 3, 2]]]:
# Homography: world 2D -> image
world_2d = mapping_3d[:, :2].astype(np.float32)
H, status = cv2.findHomography(world_2d, quad)
if H is None:
continue
# Project template intersection points to image
projected = cv2.perspectiveTransform(
template_world.reshape(-1, 1, 2), H
).reshape(-1, 2)
# Match each projected point to nearest detected intersection
threshold = 50 # pixels
matched_img = {}
matched_world = {}
used_det = set()
for i, name in enumerate(template_names):
proj = projected[i]
dists = np.linalg.norm(det_pts - proj, axis=1)
order = np.argsort(dists)
for idx in order:
if dists[idx] > threshold:
break
if int(idx) not in used_det:
matched_img[name] = (float(det_pts[idx][0]), float(det_pts[idx][1]))
matched_world[name] = template[name].tolist()
used_det.add(int(idx))
break
if len(matched_img) > best_count:
best_count = len(matched_img)
best_match = {
'image_points': matched_img,
'world_points': matched_world,
}
return best_match if best_count >= 4 else None
def _capture_var_snapshot(frame, event):
"""Create a snapshot for VAR event and store it in state."""
# Draw VAR overlay on frame
snapshot = frame.copy()
cv2.putText(snapshot, "VAR DETECT", (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm",
(10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85])
b64 = base64.b64encode(jpeg.tobytes()).decode('ascii')
state['last_var'] = {
'event': event,
'snapshot_b64': b64,
}
def detection_loop(cam_readers, model, conf_threshold, ring_buffer):
"""Main detection loop: alternate cameras, run YOLO, update state."""
frame_counts = {sid: 0 for sid in cam_readers}
start_times = {sid: time.time() for sid in cam_readers}
trajectory = state['trajectory']
event_detector = state['event_detector']
calibrators = state['calibrators']
while True:
for sensor_id, reader in cam_readers.items():
cam = state['cameras'][sensor_id]
frame = reader.grab()
if frame is None:
continue
now = time.time()
# Store raw frame in ring buffer for VAR
ring_buffer.push(frame, now, sensor_id)
# Only run detection if camera is calibrated
cal = calibrators.get(sensor_id)
is_calibrated = cal and cal.calibrated
det_count = 0
best_detection = None
best_conf = 0
if is_calibrated:
# YOLO detection — only after calibration
results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold)
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
label = f"Ball {conf:.0%}"
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1)
cv2.putText(frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
det_count += 1
if conf > best_conf:
best_conf = conf
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
diameter = max(x2 - x1, y2 - y1)
best_detection = (cx, cy, diameter, conf)
# Update trajectory and check VAR
if best_detection:
px, py, diam, conf = best_detection
pos_3d = cal.pixel_to_3d(px, py, diam)
if pos_3d:
trajectory.add_observation(
pos_3d[0], pos_3d[1], pos_3d[2],
now, frame_counts[sensor_id], sensor_id, conf
)
# Check for close calls
event = event_detector.check(trajectory)
if event:
print(f"[VAR] Close call! Line: {event['line']}, "
f"Distance: {event['distance_m']*100:.0f}cm")
state['events'].append(event)
_capture_var_snapshot(frame, event)
# FPS tracking
frame_counts[sensor_id] += 1
elapsed = time.time() - start_times[sensor_id]
fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0
# HUD overlay
cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
if det_count > 0:
cv2.putText(frame, f"Balls: {det_count}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# Encode JPEG and update shared state
_, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
with cam['lock']:
cam['frame'] = jpeg.tobytes()
cam['fps'] = fps_actual
cam['detections'] = det_count
if frame_counts[sensor_id] % 150 == 0:
speed = trajectory.get_speed()
speed_str = f"{speed:.1f} m/s" if speed else "N/A"
print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, "
f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}")
def main():
global _cam_readers, _args
parser = argparse.ArgumentParser(description='Pickle Vision Referee System')
parser.add_argument('--width', type=int, default=1280)
parser.add_argument('--height', type=int, default=720)
parser.add_argument('--fps', type=int, default=30)
parser.add_argument('--model', type=str, default='yolov8n.pt')
parser.add_argument('--conf', type=float, default=0.25)
parser.add_argument('--port', type=int, default=8080)
parser.add_argument('--buffer-seconds', type=int, default=10,
help='Ring buffer size in seconds for VAR clips')
parser.add_argument('--calibration-dir', type=str,
default=os.path.join(os.path.dirname(__file__), 'config'),
help='Directory with calibration JSON files')
args = parser.parse_args()
_args = args
# Load YOLO model
print(f"Loading YOLO model: {args.model}")
model = YOLO(args.model)
try:
model.to("cuda")
print("Inference on CUDA")
except Exception:
print("CUDA unavailable, using CPU")
# Initialize shared state
state['trajectory'] = TrajectoryModel(fps=args.fps)
state['event_detector'] = EventDetector(trigger_distance_m=0.3)
state['calibration_dir'] = args.calibration_dir
state['calibrate_fn'] = auto_calibrate
ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps)
# Start with empty calibrators — user must calibrate via UI
os.makedirs(args.calibration_dir, exist_ok=True)
for sensor_id in [0, 1]:
state['calibrators'][sensor_id] = CameraCalibrator()
# Start camera readers
cam_readers = {}
for sensor_id in [0, 1]:
state['cameras'][sensor_id] = {
'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0
}
cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps)
_cam_readers = cam_readers
# Wait for at least one camera
print("Waiting for cameras...")
for _ in range(100):
if any(r.grab() is not None for r in cam_readers.values()):
break
time.sleep(0.1)
# Start detection loop
det_thread = threading.Thread(
target=detection_loop,
args=(cam_readers, model, args.conf, ring_buffer),
daemon=True
)
det_thread.start()
time.sleep(2)
print(f"\n{'=' * 50}")
print(f" Pickle Vision Referee System")
print(f" Open in browser: http://192.168.1.253:{args.port}")
print(f"{'=' * 50}\n")
app.run(host='0.0.0.0', port=args.port, threaded=True)
if __name__ == '__main__':
main()