#!/usr/bin/env python3 """ Pickle Vision - Referee System main entry point for Jetson. Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers. """ import sys import os import cv2 import time import base64 import argparse import threading import numpy as np # Add project root to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from ultralytics import YOLO from src.streaming.camera_reader import CameraReader from src.streaming.ring_buffer import FrameRingBuffer from src.physics.trajectory import TrajectoryModel from src.physics.event_detector import EventDetector from src.calibration.camera_calibrator import ( CameraCalibrator, get_half_court_3d_points, COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH ) from src.web.app import app, state BALL_CLASS_ID = 32 # sports ball in COCO # Global references set in main() _cam_readers = {} _args = None def auto_calibrate(): """One-click calibration: detect court lines from current frames, compute camera pose, save to config. Each camera sees one half of the court from the net position. We use the court lines visible in each frame to build correspondences. For now: use a simple approach — detect the 4 most prominent lines (baseline, two sidelines, kitchen line) and map to known 3D coords. Returns error if court lines cannot be detected (no fallbacks). """ results = {} for sensor_id, reader in _cam_readers.items(): frame = reader.grab() if frame is None: results[str(sensor_id)] = {'ok': False, 'error': 'No frame available'} continue h, w = frame.shape[:2] side = 'left' if sensor_id == 0 else 'right' # Try to detect court lines using edge detection + Hough corners_pixel = _detect_court_corners(frame, side) if corners_pixel is None: results[str(sensor_id)] = { 'ok': False, 'error': f'Could not detect court lines for CAM {sensor_id}. ' 'Ensure court lines are clearly visible.' } continue # Get known 3D coordinates for this half corners_3d = get_half_court_3d_points(side) # Calibrate cal = CameraCalibrator() ok = cal.calibrate( np.array(corners_pixel, dtype=np.float32), corners_3d, w, h ) if ok: # Save to config cal_path = os.path.join(_args.calibration_dir, f'cam{sensor_id}_calibration.json') os.makedirs(os.path.dirname(cal_path), exist_ok=True) cal.save(cal_path) state['calibrators'][sensor_id] = cal # Get camera position for 3D scene cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten() results[str(sensor_id)] = { 'ok': True, 'camera_position': cam_pos.tolist(), 'corners_pixel': corners_pixel.tolist() if isinstance(corners_pixel, np.ndarray) else corners_pixel, } print(f"[CAM {sensor_id}] Calibrated! Camera at " f"({cam_pos[0]:.1f}, {cam_pos[1]:.1f}, {cam_pos[2]:.1f})") else: results[str(sensor_id)] = {'ok': False, 'error': 'Calibration failed'} return results def _detect_court_corners(frame, side): """Detect court corners from frame using edge detection. Returns 4 corner points as numpy array, or None if detection fails. """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) blur = cv2.GaussianBlur(gray, (5, 5), 0) edges = cv2.Canny(blur, 50, 150) # Detect lines lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=80, minLineLength=100, maxLineGap=20) if lines is None or len(lines) < 4: return None # Classify lines into horizontal and vertical horizontals = [] verticals = [] for line in lines: x1, y1, x2, y2 = line[0] angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) if angle < 30 or angle > 150: horizontals.append(line[0]) elif 60 < angle < 120: verticals.append(line[0]) if len(horizontals) < 2 or len(verticals) < 2: return None # Cluster lines by position to find the dominant ones h_positions = sorted(horizontals, key=lambda l: (l[1] + l[3]) / 2) v_positions = sorted(verticals, key=lambda l: (l[0] + l[2]) / 2) # Take the most separated horizontal pair (top and bottom court lines) top_line = h_positions[0] bottom_line = h_positions[-1] # Take the most separated vertical pair (left and right sidelines) left_line = v_positions[0] right_line = v_positions[-1] # Find intersections as corner points def line_intersection(l1, l2): x1, y1, x2, y2 = l1 x3, y3, x4, y4 = l2 denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) if abs(denom) < 1e-6: return None t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom ix = x1 + t * (x2 - x1) iy = y1 + t * (y2 - y1) return [ix, iy] corners = [ line_intersection(top_line, left_line), # TL line_intersection(top_line, right_line), # TR line_intersection(bottom_line, right_line), # BR line_intersection(bottom_line, left_line), # BL ] if any(c is None for c in corners): return None return np.array(corners, dtype=np.float32) def _capture_var_snapshot(frame, event): """Create a snapshot for VAR event and store it in state.""" # Draw VAR overlay on frame snapshot = frame.copy() cv2.putText(snapshot, "VAR DETECT", (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm", (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) _, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85]) b64 = base64.b64encode(jpeg.tobytes()).decode('ascii') state['last_var'] = { 'event': event, 'snapshot_b64': b64, } def detection_loop(cam_readers, model, conf_threshold, ring_buffer): """Main detection loop: alternate cameras, run YOLO, update state.""" frame_counts = {sid: 0 for sid in cam_readers} start_times = {sid: time.time() for sid in cam_readers} trajectory = state['trajectory'] event_detector = state['event_detector'] calibrators = state['calibrators'] while True: for sensor_id, reader in cam_readers.items(): cam = state['cameras'][sensor_id] frame = reader.grab() if frame is None: continue now = time.time() # Store raw frame in ring buffer for VAR ring_buffer.push(frame, now, sensor_id) # Only run detection if camera is calibrated cal = calibrators.get(sensor_id) is_calibrated = cal and cal.calibrated det_count = 0 best_detection = None best_conf = 0 if is_calibrated: # YOLO detection — only after calibration results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold) for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = float(box.conf[0]) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3) label = f"Ball {conf:.0%}" (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1) cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2) det_count += 1 if conf > best_conf: best_conf = conf cx = (x1 + x2) / 2 cy = (y1 + y2) / 2 diameter = max(x2 - x1, y2 - y1) best_detection = (cx, cy, diameter, conf) # Update trajectory and check VAR if best_detection: px, py, diam, conf = best_detection pos_3d = cal.pixel_to_3d(px, py, diam) if pos_3d: trajectory.add_observation( pos_3d[0], pos_3d[1], pos_3d[2], now, frame_counts[sensor_id], sensor_id, conf ) # Check for close calls event = event_detector.check(trajectory) if event: print(f"[VAR] Close call! Line: {event['line']}, " f"Distance: {event['distance_m']*100:.0f}cm") state['events'].append(event) _capture_var_snapshot(frame, event) # FPS tracking frame_counts[sensor_id] += 1 elapsed = time.time() - start_times[sensor_id] fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0 # HUD overlay cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) if det_count > 0: cv2.putText(frame, f"Balls: {det_count}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) # Encode JPEG and update shared state _, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80]) with cam['lock']: cam['frame'] = jpeg.tobytes() cam['fps'] = fps_actual cam['detections'] = det_count if frame_counts[sensor_id] % 150 == 0: speed = trajectory.get_speed() speed_str = f"{speed:.1f} m/s" if speed else "N/A" print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, " f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}") def main(): global _cam_readers, _args parser = argparse.ArgumentParser(description='Pickle Vision Referee System') parser.add_argument('--width', type=int, default=1280) parser.add_argument('--height', type=int, default=720) parser.add_argument('--fps', type=int, default=30) parser.add_argument('--model', type=str, default='yolov8n.pt') parser.add_argument('--conf', type=float, default=0.25) parser.add_argument('--port', type=int, default=8080) parser.add_argument('--buffer-seconds', type=int, default=10, help='Ring buffer size in seconds for VAR clips') parser.add_argument('--calibration-dir', type=str, default=os.path.join(os.path.dirname(__file__), 'config'), help='Directory with calibration JSON files') args = parser.parse_args() _args = args # Load YOLO model print(f"Loading YOLO model: {args.model}") model = YOLO(args.model) try: model.to("cuda") print("Inference on CUDA") except Exception: print("CUDA unavailable, using CPU") # Initialize shared state state['trajectory'] = TrajectoryModel(fps=args.fps) state['event_detector'] = EventDetector(trigger_distance_m=0.3) state['calibration_dir'] = args.calibration_dir state['calibrate_fn'] = auto_calibrate ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps) # Load calibrations if available os.makedirs(args.calibration_dir, exist_ok=True) for sensor_id in [0, 1]: cal = CameraCalibrator() cal_path = os.path.join(args.calibration_dir, f'cam{sensor_id}_calibration.json') if os.path.exists(cal_path): if cal.load(cal_path): print(f"[CAM {sensor_id}] Loaded calibration from {cal_path}") state['calibrators'][sensor_id] = cal # Start camera readers cam_readers = {} for sensor_id in [0, 1]: state['cameras'][sensor_id] = { 'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0 } cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps) _cam_readers = cam_readers # Wait for at least one camera print("Waiting for cameras...") for _ in range(100): if any(r.grab() is not None for r in cam_readers.values()): break time.sleep(0.1) # Start detection loop det_thread = threading.Thread( target=detection_loop, args=(cam_readers, model, args.conf, ring_buffer), daemon=True ) det_thread.start() time.sleep(2) print(f"\n{'=' * 50}") print(f" Pickle Vision Referee System") print(f" Open in browser: http://192.168.1.253:{args.port}") print(f"{'=' * 50}\n") app.run(host='0.0.0.0', port=args.port, threaded=True) if __name__ == '__main__': main()