Files
pickle_vision/jetson/main.py
Ruslan Bakiev e12edab19b Remove all fallbacks: show errors, draw debug lines on calibration
- Remove try-except in CameraCalibrator — errors propagate to UI
- Remove auto-load of saved calibrations — always start uncalibrated
- Remove hardcoded "Base Setup" card with fake values
- Remove addStereocameras/getCamParams dead code
- Draw all detected Hough lines + corners on debug frame during calibration
- Show debug images in calibration tab after attempt
- Show error messages in UI instead of swallowing them

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 14:27:53 +07:00

442 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Pickle Vision - Referee System main entry point for Jetson.
Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers.
"""
import sys
import os
import cv2
import time
import base64
import argparse
import threading
import numpy as np
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from ultralytics import YOLO
from src.streaming.camera_reader import CameraReader
from src.streaming.ring_buffer import FrameRingBuffer
from src.physics.trajectory import TrajectoryModel
from src.physics.event_detector import EventDetector
from src.calibration.camera_calibrator import (
CameraCalibrator, get_half_court_3d_points,
COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH
)
from src.web.app import app, state
BALL_CLASS_ID = 32 # sports ball in COCO
# Global references set in main()
_cam_readers = {}
_args = None
def auto_calibrate():
"""One-click calibration: detect court lines from current frames,
compute camera pose, save to config.
Each camera sees one half of the court from the net position.
Detects court lines via Hough transform, finds 4 corners,
then uses solvePnP to determine camera position.
Returns debug images with detected lines drawn on them.
"""
results = {}
for sensor_id, reader in _cam_readers.items():
frame = reader.grab()
if frame is None:
results[str(sensor_id)] = {'ok': False, 'error': 'No frame available'}
continue
h, w = frame.shape[:2]
side = 'left' if sensor_id == 0 else 'right'
debug_frame = frame.copy()
# Detect court lines — returns corners + debug info
detection = _detect_court_corners(frame, side)
# Draw all detected Hough lines on debug frame
if detection and detection.get('all_lines') is not None:
for line in detection['all_lines']:
x1, y1, x2, y2 = line[0]
cv2.line(debug_frame, (x1, y1), (x2, y2), (50, 50, 50), 1)
# Draw classified lines
if detection and detection.get('horizontals'):
for line in detection['horizontals']:
x1, y1, x2, y2 = line
cv2.line(debug_frame, (x1, y1), (x2, y2), (0, 255, 255), 2) # yellow = horizontal
if detection and detection.get('verticals'):
for line in detection['verticals']:
x1, y1, x2, y2 = line
cv2.line(debug_frame, (x1, y1), (x2, y2), (255, 0, 255), 2) # magenta = vertical
# Draw selected 4 lines (top/bottom/left/right)
if detection and detection.get('selected_lines'):
sel = detection['selected_lines']
colors = {'top': (0, 255, 0), 'bottom': (0, 200, 0),
'left': (255, 128, 0), 'right': (200, 100, 0)}
for name, line in sel.items():
x1, y1, x2, y2 = line
cv2.line(debug_frame, (x1, y1), (x2, y2), colors.get(name, (255, 255, 255)), 3)
cv2.putText(debug_frame, name, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, colors.get(name, (255, 255, 255)), 1)
# Encode debug frame
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
debug_b64 = base64.b64encode(jpeg.tobytes()).decode('ascii')
corners_pixel = detection.get('corners') if detection else None
if corners_pixel is None:
error_detail = detection.get('error', 'Unknown') if detection else 'No lines detected at all'
results[str(sensor_id)] = {
'ok': False,
'error': f'CAM {sensor_id}: {error_detail}',
'debug_image': debug_b64,
}
continue
# Draw corners on debug frame
for i, corner in enumerate(corners_pixel):
pt = (int(corner[0]), int(corner[1]))
cv2.circle(debug_frame, pt, 8, (0, 0, 255), -1)
cv2.putText(debug_frame, f'C{i}', (pt[0] + 10, pt[1]),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
# Re-encode with corners
_, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
debug_b64 = base64.b64encode(jpeg.tobytes()).decode('ascii')
# Get known 3D coordinates for this half
corners_3d = get_half_court_3d_points(side)
# Calibrate — no try/except, let errors propagate
cal = CameraCalibrator()
cal.calibrate(
np.array(corners_pixel, dtype=np.float32),
corners_3d,
w, h
)
# Save to config
cal_path = os.path.join(_args.calibration_dir,
f'cam{sensor_id}_calibration.json')
os.makedirs(os.path.dirname(cal_path), exist_ok=True)
cal.save(cal_path)
state['calibrators'][sensor_id] = cal
# Get camera position for 3D scene
cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten()
results[str(sensor_id)] = {
'ok': True,
'camera_position': cam_pos.tolist(),
'debug_image': debug_b64,
}
print(f"[CAM {sensor_id}] Calibrated! Camera at "
f"({cam_pos[0]:.1f}, {cam_pos[1]:.1f}, {cam_pos[2]:.1f})")
return results
def _detect_court_corners(frame, side):
"""Detect court corners from frame using edge detection.
Returns dict with:
corners: 4x2 numpy array or None
all_lines: raw Hough lines
horizontals: classified horizontal lines
verticals: classified vertical lines
selected_lines: the 4 lines used (top/bottom/left/right)
error: description if detection failed
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blur, 50, 150)
# Detect lines
lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=80,
minLineLength=100, maxLineGap=20)
if lines is None or len(lines) < 4:
n = 0 if lines is None else len(lines)
return {
'corners': None, 'all_lines': lines,
'horizontals': [], 'verticals': [],
'selected_lines': {},
'error': f'Only {n} Hough lines found (need >= 4)',
}
# Classify lines into horizontal and vertical
horizontals = []
verticals = []
for line in lines:
x1, y1, x2, y2 = line[0]
angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi)
if angle < 30 or angle > 150:
horizontals.append(line[0])
elif 60 < angle < 120:
verticals.append(line[0])
if len(horizontals) < 2 or len(verticals) < 2:
return {
'corners': None, 'all_lines': lines,
'horizontals': [h.tolist() for h in horizontals],
'verticals': [v.tolist() for v in verticals],
'selected_lines': {},
'error': f'{len(horizontals)} horizontal, {len(verticals)} vertical lines (need >= 2 each)',
}
# Cluster lines by position to find the dominant ones
h_positions = sorted(horizontals, key=lambda l: (l[1] + l[3]) / 2)
v_positions = sorted(verticals, key=lambda l: (l[0] + l[2]) / 2)
top_line = h_positions[0]
bottom_line = h_positions[-1]
left_line = v_positions[0]
right_line = v_positions[-1]
selected = {
'top': top_line.tolist(), 'bottom': bottom_line.tolist(),
'left': left_line.tolist(), 'right': right_line.tolist(),
}
# Find intersections as corner points
def line_intersection(l1, l2):
x1, y1, x2, y2 = l1
x3, y3, x4, y4 = l2
denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
if abs(denom) < 1e-6:
return None
t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom
ix = x1 + t * (x2 - x1)
iy = y1 + t * (y2 - y1)
return [ix, iy]
corners = [
line_intersection(top_line, left_line), # TL
line_intersection(top_line, right_line), # TR
line_intersection(bottom_line, right_line), # BR
line_intersection(bottom_line, left_line), # BL
]
if any(c is None for c in corners):
return {
'corners': None, 'all_lines': lines,
'horizontals': [h.tolist() for h in horizontals],
'verticals': [v.tolist() for v in verticals],
'selected_lines': selected,
'error': 'Lines are parallel — could not find all 4 corner intersections',
}
return {
'corners': np.array(corners, dtype=np.float32),
'all_lines': lines,
'horizontals': [h.tolist() for h in horizontals],
'verticals': [v.tolist() for v in verticals],
'selected_lines': selected,
'error': None,
}
def _capture_var_snapshot(frame, event):
"""Create a snapshot for VAR event and store it in state."""
# Draw VAR overlay on frame
snapshot = frame.copy()
cv2.putText(snapshot, "VAR DETECT", (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm",
(10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
_, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85])
b64 = base64.b64encode(jpeg.tobytes()).decode('ascii')
state['last_var'] = {
'event': event,
'snapshot_b64': b64,
}
def detection_loop(cam_readers, model, conf_threshold, ring_buffer):
"""Main detection loop: alternate cameras, run YOLO, update state."""
frame_counts = {sid: 0 for sid in cam_readers}
start_times = {sid: time.time() for sid in cam_readers}
trajectory = state['trajectory']
event_detector = state['event_detector']
calibrators = state['calibrators']
while True:
for sensor_id, reader in cam_readers.items():
cam = state['cameras'][sensor_id]
frame = reader.grab()
if frame is None:
continue
now = time.time()
# Store raw frame in ring buffer for VAR
ring_buffer.push(frame, now, sensor_id)
# Only run detection if camera is calibrated
cal = calibrators.get(sensor_id)
is_calibrated = cal and cal.calibrated
det_count = 0
best_detection = None
best_conf = 0
if is_calibrated:
# YOLO detection — only after calibration
results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold)
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf[0])
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
label = f"Ball {conf:.0%}"
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1)
cv2.putText(frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
det_count += 1
if conf > best_conf:
best_conf = conf
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
diameter = max(x2 - x1, y2 - y1)
best_detection = (cx, cy, diameter, conf)
# Update trajectory and check VAR
if best_detection:
px, py, diam, conf = best_detection
pos_3d = cal.pixel_to_3d(px, py, diam)
if pos_3d:
trajectory.add_observation(
pos_3d[0], pos_3d[1], pos_3d[2],
now, frame_counts[sensor_id], sensor_id, conf
)
# Check for close calls
event = event_detector.check(trajectory)
if event:
print(f"[VAR] Close call! Line: {event['line']}, "
f"Distance: {event['distance_m']*100:.0f}cm")
state['events'].append(event)
_capture_var_snapshot(frame, event)
# FPS tracking
frame_counts[sensor_id] += 1
elapsed = time.time() - start_times[sensor_id]
fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0
# HUD overlay
cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
if det_count > 0:
cv2.putText(frame, f"Balls: {det_count}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# Encode JPEG and update shared state
_, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
with cam['lock']:
cam['frame'] = jpeg.tobytes()
cam['fps'] = fps_actual
cam['detections'] = det_count
if frame_counts[sensor_id] % 150 == 0:
speed = trajectory.get_speed()
speed_str = f"{speed:.1f} m/s" if speed else "N/A"
print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, "
f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}")
def main():
global _cam_readers, _args
parser = argparse.ArgumentParser(description='Pickle Vision Referee System')
parser.add_argument('--width', type=int, default=1280)
parser.add_argument('--height', type=int, default=720)
parser.add_argument('--fps', type=int, default=30)
parser.add_argument('--model', type=str, default='yolov8n.pt')
parser.add_argument('--conf', type=float, default=0.25)
parser.add_argument('--port', type=int, default=8080)
parser.add_argument('--buffer-seconds', type=int, default=10,
help='Ring buffer size in seconds for VAR clips')
parser.add_argument('--calibration-dir', type=str,
default=os.path.join(os.path.dirname(__file__), 'config'),
help='Directory with calibration JSON files')
args = parser.parse_args()
_args = args
# Load YOLO model
print(f"Loading YOLO model: {args.model}")
model = YOLO(args.model)
try:
model.to("cuda")
print("Inference on CUDA")
except Exception:
print("CUDA unavailable, using CPU")
# Initialize shared state
state['trajectory'] = TrajectoryModel(fps=args.fps)
state['event_detector'] = EventDetector(trigger_distance_m=0.3)
state['calibration_dir'] = args.calibration_dir
state['calibrate_fn'] = auto_calibrate
ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps)
# Start with empty calibrators — user must calibrate via UI
os.makedirs(args.calibration_dir, exist_ok=True)
for sensor_id in [0, 1]:
state['calibrators'][sensor_id] = CameraCalibrator()
# Start camera readers
cam_readers = {}
for sensor_id in [0, 1]:
state['cameras'][sensor_id] = {
'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0
}
cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps)
_cam_readers = cam_readers
# Wait for at least one camera
print("Waiting for cameras...")
for _ in range(100):
if any(r.grab() is not None for r in cam_readers.values()):
break
time.sleep(0.1)
# Start detection loop
det_thread = threading.Thread(
target=detection_loop,
args=(cam_readers, model, args.conf, ring_buffer),
daemon=True
)
det_thread.start()
time.sleep(2)
print(f"\n{'=' * 50}")
print(f" Pickle Vision Referee System")
print(f" Open in browser: http://192.168.1.253:{args.port}")
print(f"{'=' * 50}\n")
app.run(host='0.0.0.0', port=args.port, threaded=True)
if __name__ == '__main__':
main()