Files
pickle_vision/jetson/main.py
Ruslan Bakiev 6225809459 Click-to-fullscreen debug images + highlight detected lines on 3D scene
- Debug images clickable — opens fullscreen overlay (click to close)
- After calibration, detected court lines highlighted on 3D scene
  (blue for CAM0, pink for CAM1)
- Return matched_lines_3d from calibration (baseline, kitchen, sidelines, service)
- Show line names in calibration results

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 14:53:37 +07:00

603 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Pickle Vision - Referee System main entry point for Jetson.
Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers.
"""
import sys
import os
import cv2
import time
import base64
import argparse
import threading
import numpy as np
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ultralytics import YOLO
from src.streaming.camera_reader import CameraReader
from src.streaming.ring_buffer import FrameRingBuffer
from src.physics.trajectory import TrajectoryModel
from src.physics.event_detector import EventDetector
from src.calibration.camera_calibrator import (
CameraCalibrator, get_half_court_3d_points,
COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH
)
from src.web.app import app, state
BALL_CLASS_ID = 32 # sports ball in COCO
# Global references set in main()
_cam_readers = {}
_args = None
def auto_calibrate():
"""Detect all visible court lines, identify them, compute camera 3D position.
Each camera sees one half of the pickleball court from the net.
We detect all white lines, classify them by angle and position,
try to match them to known court lines, then use solvePnP.
Returns debug images showing every detected line + camera position.
"""
results = {}
for sensor_id, reader in _cam_readers.items():
frame = reader.grab()
if frame is None:
results[str(sensor_id)] = {'ok': False, 'error': 'No frame available'}
continue
h, w = frame.shape[:2]
side = 'left' if sensor_id == 0 else 'right'
debug_frame = frame.copy()
# Detect all court lines
detection = _detect_court_lines(frame)
all_segments = detection['segments']
grouped = detection['grouped']
# Draw ALL detected line segments on debug frame
for seg in all_segments:
x1, y1, x2, y2 = seg
cv2.line(debug_frame, (x1, y1), (x2, y2), (80, 80, 80), 1)
# Draw grouped/merged lines with colors
# "across" lines (roughly horizontal in image = baseline, kitchen, net)
across_colors = [(0, 255, 255), (0, 200, 200), (0, 150, 150),
(0, 100, 200), (0, 80, 160)]
for i, line in enumerate(grouped.get('across', [])):
x1, y1, x2, y2 = line['endpoints']
color = across_colors[i % len(across_colors)]
cv2.line(debug_frame, (x1, y1), (x2, y2), color, 3)
cv2.putText(debug_frame, f"across#{i} ({line['count']}seg)",
(x1, max(y1 - 8, 15)),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)
# "along" lines (roughly vertical in image = sidelines, center service)
along_colors = [(255, 0, 255), (200, 0, 200), (150, 0, 150),
(200, 0, 100), (160, 0, 80)]
for i, line in enumerate(grouped.get('along', [])):
x1, y1, x2, y2 = line['endpoints']
color = along_colors[i % len(along_colors)]
cv2.line(debug_frame, (x1, y1), (x2, y2), color, 3)
cv2.putText(debug_frame, f"along#{i} ({line['count']}seg)",
(x1 + 5, (y1 + y2) // 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)
# Summary text
n_across = len(grouped.get('across', []))
n_along = len(grouped.get('along', []))
cv2.putText(debug_frame,
f"Lines: {len(all_segments)} raw, {n_across} across, {n_along} along",
(10, h - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# Try to match lines to known court geometry and calibrate
match = _match_court_lines(grouped, side, w, h)
if match['points_2d'] is not None and len(match['points_2d']) >= 4:
# We have enough correspondences — calibrate
cal = CameraCalibrator()
cal.calibrate(
np.array(match['points_2d'], dtype=np.float32),
np.array(match['points_3d'], dtype=np.float32),
w, h
)
cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten()
# Draw matched points
for i, (px, py) in enumerate(match['points_2d']):
cv2.circle(debug_frame, (int(px), int(py)), 6, (0, 255, 0), -1)
cv2.putText(debug_frame,
f"CAM{sensor_id}: X={cam_pos[0]:.2f} Y={cam_pos[1]:.2f} Z={cam_pos[2]:.2f}m",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(debug_frame,
f"Matched {len(match['points_2d'])} points, {side} half",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# Save
cal_path = os.path.join(_args.calibration_dir,
f'cam{sensor_id}_calibration.json')
os.makedirs(os.path.dirname(cal_path), exist_ok=True)
cal.save(cal_path)
state['calibrators'][sensor_id] = cal
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
results[str(sensor_id)] = {
'ok': True,
'camera_position': cam_pos.tolist(),
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
'lines_detected': {'across': n_across, 'along': n_along},
'points_matched': len(match['points_2d']),
'matched_lines_3d': match.get('matched_lines_3d', []),
}
print(f"[CAM {sensor_id}] Calibrated! Camera at "
f"({cam_pos[0]:.2f}, {cam_pos[1]:.2f}, {cam_pos[2]:.2f}), "
f"{len(match['points_2d'])} correspondences")
else:
cv2.putText(debug_frame,
f"Not enough correspondences: {match.get('error', 'unknown')}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
results[str(sensor_id)] = {
'ok': False,
'error': f"CAM {sensor_id}: {match.get('error', 'Not enough line correspondences')}",
'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'),
'lines_detected': {'across': n_across, 'along': n_along},
}
return results
def _detect_court_lines(frame):
"""Detect all white line segments in the frame.
Returns:
segments: list of [x1, y1, x2, y2] raw segments
grouped: dict with 'across' and 'along' merged line groups
"""
# White line detection via color thresholding + edges
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# White lines are bright — threshold to isolate them
_, white_mask = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY)
# Also use edge detection
blur = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blur, 50, 150)
# Combine: edges that are also near white regions
dilated_white = cv2.dilate(white_mask, np.ones((5, 5), np.uint8))
combined = cv2.bitwise_and(edges, dilated_white)
# Hough line detection on combined mask
lines = cv2.HoughLinesP(combined, 1, np.pi / 180, threshold=50,
minLineLength=60, maxLineGap=30)
if lines is None:
return {'segments': [], 'grouped': {'across': [], 'along': []}}
segments = [line[0].tolist() for line in lines]
# Classify by angle and group nearby parallel lines
across_lines = [] # roughly horizontal (court width direction)
along_lines = [] # roughly vertical (court length direction)
for seg in segments:
x1, y1, x2, y2 = seg
angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi
length = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
if length < 40:
continue
abs_angle = abs(angle)
if abs_angle < 40 or abs_angle > 140:
across_lines.append(seg)
elif 50 < abs_angle < 130:
along_lines.append(seg)
def merge_nearby(segs, position_key, threshold=30):
"""Merge line segments that are close together (same court line)."""
if not segs:
return []
# Sort by mid-position perpendicular to the line direction
def get_pos(s):
if position_key == 'y':
return (s[1] + s[3]) / 2
return (s[0] + s[2]) / 2
sorted_segs = sorted(segs, key=get_pos)
groups = []
current_group = [sorted_segs[0]]
for seg in sorted_segs[1:]:
if abs(get_pos(seg) - get_pos(current_group[-1])) < threshold:
current_group.append(seg)
else:
groups.append(current_group)
current_group = [seg]
groups.append(current_group)
# Merge each group into one representative line
merged = []
for group in groups:
all_pts = []
for s in group:
all_pts.extend([(s[0], s[1]), (s[2], s[3])])
all_pts = np.array(all_pts)
# Fit a line through all points
if len(all_pts) >= 2:
vx, vy, cx, cy = cv2.fitLine(all_pts, cv2.DIST_L2, 0, 0.01, 0.01).flatten()
# Extend line across frame
t_max = max(np.sqrt((p[0] - cx) ** 2 + (p[1] - cy) ** 2) for p in all_pts)
x1, y1 = int(cx - vx * t_max), int(cy - vy * t_max)
x2, y2 = int(cx + vx * t_max), int(cy + vy * t_max)
mid_pos = get_pos(group[0])
merged.append({
'endpoints': [x1, y1, x2, y2],
'mid_pos': mid_pos,
'count': len(group),
})
return merged
grouped = {
'across': merge_nearby(across_lines, 'y'),
'along': merge_nearby(along_lines, 'x'),
}
return {'segments': segments, 'grouped': grouped}
def _match_court_lines(grouped, side, frame_w, frame_h):
"""Try to match detected line groups to known court lines.
For a camera at the net looking at one half:
- 'across' lines map to: baseline, kitchen line (and possibly net line)
- 'along' lines map to: left sideline, right sideline, center service line
Returns dict with points_2d, points_3d for solvePnP, or error.
"""
across = grouped.get('across', [])
along = grouped.get('along', [])
if len(across) < 2 or len(along) < 2:
return {
'points_2d': None, 'points_3d': None,
'error': f'{len(across)} across + {len(along)} along lines (need 2+ each)',
}
# Sort across lines by vertical position (top of frame = far from camera = baseline)
across_sorted = sorted(across, key=lambda l: l['mid_pos'])
# Sort along lines by horizontal position
along_sorted = sorted(along, key=lambda l: l['mid_pos'])
# Known court geometry for this half
# Court: 13.4m x 6.1m, half = 6.7m
# Kitchen (NVZ) = 2.13m from net
if side == 'left':
# Camera at net (X=6.7) looking toward baseline (X=0)
# "across" lines (perpendicular to camera view):
# - baseline at X=0
# - kitchen at X=6.7-2.13=4.57
# "along" lines (parallel to camera view):
# - near sideline at Y=0
# - far sideline at Y=6.1
# - center service at Y=3.05
across_3d_x = [0, 4.57] # baseline, kitchen
along_3d_y = [0, 6.1] # near sideline, far sideline
center_service_y = 3.05
else:
# Camera at net (X=6.7) looking toward baseline (X=13.4)
across_3d_x = [13.4, 6.7 + 2.13] # baseline (far), kitchen
along_3d_y = [0, 6.1]
center_service_y = 3.05
# Match: take outermost across lines as baseline (farthest) and kitchen (closest)
# In image: top across line = far = baseline, bottom = near = kitchen
baseline_px = across_sorted[0] # topmost = farthest = baseline
kitchen_px = across_sorted[-1] # bottommost = nearest = kitchen
# Take outermost along lines as sidelines
left_sideline_px = along_sorted[0]
right_sideline_px = along_sorted[-1]
# Find intersection points as 2D-3D correspondences
def line_intersect(l1_ep, l2_ep):
x1, y1, x2, y2 = l1_ep
x3, y3, x4, y4 = l2_ep
denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
if abs(denom) < 1e-6:
return None
t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom
return [x1 + t * (x2 - x1), y1 + t * (y2 - y1)]
points_2d = []
points_3d = []
# Baseline x left sideline
pt = line_intersect(baseline_px['endpoints'], left_sideline_px['endpoints'])
if pt:
points_2d.append(pt)
points_3d.append([across_3d_x[0], along_3d_y[0], 0])
# Baseline x right sideline
pt = line_intersect(baseline_px['endpoints'], right_sideline_px['endpoints'])
if pt:
points_2d.append(pt)
points_3d.append([across_3d_x[0], along_3d_y[1], 0])
# Kitchen x right sideline
pt = line_intersect(kitchen_px['endpoints'], right_sideline_px['endpoints'])
if pt:
points_2d.append(pt)
points_3d.append([across_3d_x[1], along_3d_y[1], 0])
# Kitchen x left sideline
pt = line_intersect(kitchen_px['endpoints'], left_sideline_px['endpoints'])
if pt:
points_2d.append(pt)
points_3d.append([across_3d_x[1], along_3d_y[0], 0])
# If we have a center service line, add more correspondences
if len(along_sorted) >= 3:
center_px = along_sorted[len(along_sorted) // 2]
pt = line_intersect(baseline_px['endpoints'], center_px['endpoints'])
if pt:
points_2d.append(pt)
points_3d.append([across_3d_x[0], center_service_y, 0])
pt = line_intersect(kitchen_px['endpoints'], center_px['endpoints'])
if pt:
points_2d.append(pt)
points_3d.append([across_3d_x[1], center_service_y, 0])
# Build list of matched 3D court lines for visualization
matched_lines_3d = []
# Baseline
matched_lines_3d.append({
'name': 'baseline',
'from': [across_3d_x[0], along_3d_y[0], 0],
'to': [across_3d_x[0], along_3d_y[1], 0],
})
# Kitchen
matched_lines_3d.append({
'name': 'kitchen',
'from': [across_3d_x[1], along_3d_y[0], 0],
'to': [across_3d_x[1], along_3d_y[1], 0],
})
# Left sideline
matched_lines_3d.append({
'name': 'sideline_near',
'from': [across_3d_x[0], along_3d_y[0], 0],
'to': [across_3d_x[1], along_3d_y[0], 0],
})
# Right sideline
matched_lines_3d.append({
'name': 'sideline_far',
'from': [across_3d_x[0], along_3d_y[1], 0],
'to': [across_3d_x[1], along_3d_y[1], 0],
})
# Center service (if detected)
if len(along_sorted) >= 3:
matched_lines_3d.append({
'name': 'center_service',
'from': [across_3d_x[0], center_service_y, 0],
'to': [across_3d_x[1], center_service_y, 0],
})
if len(points_2d) < 4:
return {
'points_2d': None, 'points_3d': None,
'matched_lines_3d': matched_lines_3d,
'error': f'Only {len(points_2d)} intersection points found (need >= 4)',
}
return {
'points_2d': points_2d, 'points_3d': points_3d,
'matched_lines_3d': matched_lines_3d, 'error': None,
}
def _capture_var_snapshot(frame, event):
"""Create a snapshot for VAR event and store it in state."""
# Draw VAR overlay on frame
snapshot = frame.copy()
cv2.putText(snapshot, "VAR DETECT", (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm",
(10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85])
b64 = base64.b64encode(jpeg.tobytes()).decode('ascii')
state['last_var'] = {
'event': event,
'snapshot_b64': b64,
}
def detection_loop(cam_readers, model, conf_threshold, ring_buffer):
"""Main detection loop: alternate cameras, run YOLO, update state."""
frame_counts = {sid: 0 for sid in cam_readers}
start_times = {sid: time.time() for sid in cam_readers}
trajectory = state['trajectory']
event_detector = state['event_detector']
calibrators = state['calibrators']
while True:
for sensor_id, reader in cam_readers.items():
cam = state['cameras'][sensor_id]
frame = reader.grab()
if frame is None:
continue
now = time.time()
# Store raw frame in ring buffer for VAR
ring_buffer.push(frame, now, sensor_id)
# Only run detection if camera is calibrated
cal = calibrators.get(sensor_id)
is_calibrated = cal and cal.calibrated
det_count = 0
best_detection = None
best_conf = 0
if is_calibrated:
# YOLO detection — only after calibration
results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold)
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
label = f"Ball {conf:.0%}"
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1)
cv2.putText(frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
det_count += 1
if conf > best_conf:
best_conf = conf
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
diameter = max(x2 - x1, y2 - y1)
best_detection = (cx, cy, diameter, conf)
# Update trajectory and check VAR
if best_detection:
px, py, diam, conf = best_detection
pos_3d = cal.pixel_to_3d(px, py, diam)
if pos_3d:
trajectory.add_observation(
pos_3d[0], pos_3d[1], pos_3d[2],
now, frame_counts[sensor_id], sensor_id, conf
)
# Check for close calls
event = event_detector.check(trajectory)
if event:
print(f"[VAR] Close call! Line: {event['line']}, "
f"Distance: {event['distance_m']*100:.0f}cm")
state['events'].append(event)
_capture_var_snapshot(frame, event)
# FPS tracking
frame_counts[sensor_id] += 1
elapsed = time.time() - start_times[sensor_id]
fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0
# HUD overlay
cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
if det_count > 0:
cv2.putText(frame, f"Balls: {det_count}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# Encode JPEG and update shared state
_, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
with cam['lock']:
cam['frame'] = jpeg.tobytes()
cam['fps'] = fps_actual
cam['detections'] = det_count
if frame_counts[sensor_id] % 150 == 0:
speed = trajectory.get_speed()
speed_str = f"{speed:.1f} m/s" if speed else "N/A"
print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, "
f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}")
def main():
global _cam_readers, _args
parser = argparse.ArgumentParser(description='Pickle Vision Referee System')
parser.add_argument('--width', type=int, default=1280)
parser.add_argument('--height', type=int, default=720)
parser.add_argument('--fps', type=int, default=30)
parser.add_argument('--model', type=str, default='yolov8n.pt')
parser.add_argument('--conf', type=float, default=0.25)
parser.add_argument('--port', type=int, default=8080)
parser.add_argument('--buffer-seconds', type=int, default=10,
help='Ring buffer size in seconds for VAR clips')
parser.add_argument('--calibration-dir', type=str,
default=os.path.join(os.path.dirname(__file__), 'config'),
help='Directory with calibration JSON files')
args = parser.parse_args()
_args = args
# Load YOLO model
print(f"Loading YOLO model: {args.model}")
model = YOLO(args.model)
try:
model.to("cuda")
print("Inference on CUDA")
except Exception:
print("CUDA unavailable, using CPU")
# Initialize shared state
state['trajectory'] = TrajectoryModel(fps=args.fps)
state['event_detector'] = EventDetector(trigger_distance_m=0.3)
state['calibration_dir'] = args.calibration_dir
state['calibrate_fn'] = auto_calibrate
ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps)
# Start with empty calibrators — user must calibrate via UI
os.makedirs(args.calibration_dir, exist_ok=True)
for sensor_id in [0, 1]:
state['calibrators'][sensor_id] = CameraCalibrator()
# Start camera readers
cam_readers = {}
for sensor_id in [0, 1]:
state['cameras'][sensor_id] = {
'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0
}
cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps)
_cam_readers = cam_readers
# Wait for at least one camera
print("Waiting for cameras...")
for _ in range(100):
if any(r.grab() is not None for r in cam_readers.values()):
break
time.sleep(0.1)
# Start detection loop
det_thread = threading.Thread(
target=detection_loop,
args=(cam_readers, model, args.conf, ring_buffer),
daemon=True
)
det_thread.start()
time.sleep(2)
print(f"\n{'=' * 50}")
print(f" Pickle Vision Referee System")
print(f" Open in browser: http://192.168.1.253:{args.port}")
print(f"{'=' * 50}\n")
app.run(host='0.0.0.0', port=args.port, threaded=True)
if __name__ == '__main__':
main()