#!/usr/bin/env python3 """ Pickle Vision - Referee System main entry point for Jetson. Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers. """ import sys import os import cv2 import time import base64 import argparse import threading import numpy as np # Add project root to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from ultralytics import YOLO from src.streaming.camera_reader import CameraReader from src.streaming.ring_buffer import FrameRingBuffer from src.physics.trajectory import TrajectoryModel from src.physics.event_detector import EventDetector from src.calibration.camera_calibrator import ( CameraCalibrator, get_half_court_3d_points, get_half_court_intersections, COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH ) from src.web.app import app, state BALL_CLASS_ID = 32 # sports ball in COCO # Global references set in main() _cam_readers = {} _args = None def _init_calibration_steps(): """Initialize ordered calibration step status map for UI.""" return { 'green_mask': {'status': 'pending', 'detail': 'Not started'}, 'line_segments': {'status': 'pending', 'detail': 'Not started'}, 'merged_lines': {'status': 'pending', 'detail': 'Not started'}, 'intersections': {'status': 'pending', 'detail': 'Not started'}, 'template_match': {'status': 'pending', 'detail': 'Not started'}, 'camera_pose': {'status': 'pending', 'detail': 'Not started'}, 'overlay': {'status': 'pending', 'detail': 'Not started'}, } def auto_calibrate(): """Calibrate cameras by detecting court line intersections and matching them to the known pickleball court template. Algorithm: 1. Detect green court surface (search area) 2. Find white line segments on the green surface 3. Merge similar segments into distinct court lines 4. Find intersections between lines 5. Use green quad for initial homography estimate 6. Project template intersection points, match to detected ones 7. Try both left-right mirror mappings, pick sane camera position 8. Refine with solvePnP using all matched correspondences """ results = {} for sensor_id, reader in _cam_readers.items(): steps = _init_calibration_steps() frame = reader.grab() if frame is None: steps['green_mask'] = {'status': 'fail', 'detail': 'No frame available'} steps['line_segments'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['merged_lines'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['intersections'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['template_match'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['camera_pose'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['overlay'] = {'status': 'fail', 'detail': 'Calibration aborted'} results[str(sensor_id)] = { 'ok': False, 'error': f'CAM {sensor_id}: No frame available', 'steps': steps, } continue h, w = frame.shape[:2] side = 'left' if sensor_id == 0 else 'right' debug_frame = frame.copy() # Step 1: Detect green court mask green_mask = _detect_green_mask(frame) green_overlay = np.zeros_like(debug_frame) green_overlay[green_mask > 0] = (0, 80, 0) debug_frame = cv2.addWeighted(debug_frame, 1.0, green_overlay, 0.3, 0) green_pct = np.count_nonzero(green_mask) / (w * h) * 100 if green_pct < 5: steps['green_mask'] = {'status': 'fail', 'detail': f'Green area too small: {green_pct:.1f}%'} steps['line_segments'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['merged_lines'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['intersections'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['template_match'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['camera_pose'] = {'status': 'fail', 'detail': 'Calibration aborted'} steps['overlay'] = {'status': 'fail', 'detail': 'Calibration aborted'} cv2.putText(debug_frame, f"FAILED: Green area too small ({green_pct:.0f}%)", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) _, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) results[str(sensor_id)] = { 'ok': False, 'error': f'CAM {sensor_id}: Green area too small ({green_pct:.0f}%)', 'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'), 'steps': steps, } continue steps['green_mask'] = {'status': 'ok', 'detail': f'Green area: {green_pct:.1f}%'} # Step 2: Detect white line segments on court white_segments = _detect_white_lines_on_court(frame, green_mask) for x1, y1, x2, y2 in white_segments: cv2.line(debug_frame, (x1, y1), (x2, y2), (255, 255, 0), 1) if white_segments: steps['line_segments'] = {'status': 'ok', 'detail': f'{len(white_segments)} segments detected'} else: steps['line_segments'] = {'status': 'fail', 'detail': '0 segments detected'} # Step 3: Merge into distinct lines merged = _merge_line_segments(white_segments) for m in merged: p1, p2 = m['p1'], m['p2'] cv2.line(debug_frame, (int(p1[0]), int(p1[1])), (int(p2[0]), int(p2[1])), (0, 165, 255), 2) if merged: steps['merged_lines'] = {'status': 'ok', 'detail': f'{len(merged)} merged lines'} else: steps['merged_lines'] = {'status': 'fail', 'detail': '0 merged lines'} # Step 4: Find intersections intersections = _find_line_intersections(merged, w, h) for ix, iy in intersections: cv2.circle(debug_frame, (int(ix), int(iy)), 6, (0, 0, 255), -1) if intersections: steps['intersections'] = {'status': 'ok', 'detail': f'{len(intersections)} intersections'} else: steps['intersections'] = {'status': 'fail', 'detail': '0 intersections'} cv2.putText(debug_frame, f"{len(white_segments)} segs -> {len(merged)} lines -> {len(intersections)} pts", (10, h - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1) template = get_half_court_intersections(side) quad = _find_green_quad(green_mask, w * h) if len(intersections) < 4: # Fallback: try green quad + both mappings if quad is not None: cal, cam_pos, mapping_info = _calibrate_from_quad(quad, side, w, h) if cal is not None: steps['template_match'] = { 'status': 'ok', 'detail': 'Low intersections, used quad fallback mapping' } _finalize_calibration( cal=cal, cam_pos=cam_pos, sensor_id=sensor_id, side=side, debug_frame=debug_frame, results=results, method_info=mapping_info, steps=steps, n_segments=len(white_segments), n_lines=len(merged), n_intersections=len(intersections), n_points_matched=0, ) continue steps['template_match'] = {'status': 'fail', 'detail': 'Not enough intersections for template fit'} steps['camera_pose'] = {'status': 'fail', 'detail': 'solvePnP skipped'} steps['overlay'] = {'status': 'fail', 'detail': 'No calibration output'} cv2.putText(debug_frame, f"FAILED: Need 4+ intersections, got {len(intersections)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) _, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) results[str(sensor_id)] = { 'ok': False, 'error': f'CAM {sensor_id}: {len(intersections)} intersections (need 4+) from {len(merged)} lines', 'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'), 'steps': steps, } continue # Step 5: Match intersections to court template match = _match_intersections_to_template( intersections, template, side, quad, w, h ) if match is None or len(match['image_points']) < 4: # Fallback to quad-only calibration if quad is not None: cal, cam_pos, mapping_info = _calibrate_from_quad(quad, side, w, h) if cal is not None: steps['template_match'] = { 'status': 'ok', 'detail': 'Template match weak, used quad fallback mapping' } _finalize_calibration( cal=cal, cam_pos=cam_pos, sensor_id=sensor_id, side=side, debug_frame=debug_frame, results=results, method_info=mapping_info, steps=steps, n_segments=len(white_segments), n_lines=len(merged), n_intersections=len(intersections), n_points_matched=0, ) continue steps['template_match'] = {'status': 'fail', 'detail': 'Could not match intersections to template'} steps['camera_pose'] = {'status': 'fail', 'detail': 'solvePnP skipped'} steps['overlay'] = {'status': 'fail', 'detail': 'No calibration output'} cv2.putText(debug_frame, "FAILED: Could not match court pattern", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) _, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) results[str(sensor_id)] = { 'ok': False, 'error': f'CAM {sensor_id}: Could not match court pattern', 'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'), 'steps': steps, } continue n_matched = len(match['image_points']) steps['template_match'] = { 'status': 'ok', 'detail': f'Matched {n_matched}/{len(template)} template points' } # Draw matched points for name, (px, py) in match['image_points'].items(): cv2.circle(debug_frame, (int(px), int(py)), 10, (0, 255, 0), 2) cv2.putText(debug_frame, name, (int(px) + 12, int(py) - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1) # Step 6: solvePnP with matched correspondences pts_2d = np.array(list(match['image_points'].values()), dtype=np.float32) pts_3d = np.array(list(match['world_points'].values()), dtype=np.float32) cal = CameraCalibrator() try: cal.calibrate(pts_2d, pts_3d, w, h) except RuntimeError as exc: steps['camera_pose'] = {'status': 'fail', 'detail': str(exc)} steps['overlay'] = {'status': 'fail', 'detail': 'No calibration output'} _, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) results[str(sensor_id)] = { 'ok': False, 'error': f'CAM {sensor_id}: solvePnP failed ({exc})', 'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'), 'steps': steps, } continue cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten() method_info = f"{n_matched} intersections matched" # Sanity check if cam_pos[2] < 0 or cam_pos[2] > 10: cv2.putText(debug_frame, f"BAD Z={cam_pos[2]:.1f}m, trying quad fallback", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) if quad is not None: cal2, cam_pos2, info = _calibrate_from_quad(quad, side, w, h) if cal2 is not None: cal, cam_pos = cal2, cam_pos2 method_info = f"{method_info}; fallback={info}" _finalize_calibration( cal=cal, cam_pos=cam_pos, sensor_id=sensor_id, side=side, debug_frame=debug_frame, results=results, method_info=method_info, steps=steps, n_segments=len(white_segments), n_lines=len(merged), n_intersections=len(intersections), n_points_matched=n_matched, ) return results def _finalize_calibration(cal, cam_pos, sensor_id, side, debug_frame, results, method_info, steps, n_segments, n_lines, n_intersections, n_points_matched): """Save calibration and build result dict.""" steps['camera_pose'] = { 'status': 'ok', 'detail': f'Camera pose solved: X={cam_pos[0]:.2f}, Y={cam_pos[1]:.2f}, Z={cam_pos[2]:.2f}m' } steps['overlay'] = { 'status': 'ok', 'detail': f'Geometry projected with {n_points_matched} matched points' } cv2.putText(debug_frame, f"CAM{sensor_id}: X={cam_pos[0]:.2f} Y={cam_pos[1]:.2f} Z={cam_pos[2]:.2f}m", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) cv2.putText(debug_frame, f"{side} half | {method_info}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) cal_path = os.path.join(_args.calibration_dir, f'cam{sensor_id}_calibration.json') os.makedirs(os.path.dirname(cal_path), exist_ok=True) cal.save(cal_path) state['calibrators'][sensor_id] = cal _, jpeg = cv2.imencode('.jpg', debug_frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) matched_lines_3d = _get_half_court_lines_3d(side) results[str(sensor_id)] = { 'ok': True, 'camera_position': cam_pos.tolist(), 'debug_image': base64.b64encode(jpeg.tobytes()).decode('ascii'), 'matched_lines_3d': matched_lines_3d, 'points_matched': n_points_matched, 'intersections_detected': n_intersections, 'lines_detected': n_lines, 'segments_detected': n_segments, 'method': method_info, 'steps': steps, } print(f"[CAM {sensor_id}] Calibrated! Camera at " f"({cam_pos[0]:.2f}, {cam_pos[1]:.2f}, {cam_pos[2]:.2f}) via {method_info}") def _get_half_court_lines_3d(side): """Return 3D line segments for all lines of a half-court.""" if side == 'left': bx, kx = 0, 4.57 else: bx, kx = 13.4, 8.83 return [ {'name': 'baseline', 'from': [bx, 0, 0], 'to': [bx, 6.1, 0]}, {'name': 'kitchen', 'from': [kx, 0, 0], 'to': [kx, 6.1, 0]}, {'name': 'sideline_near', 'from': [bx, 0, 0], 'to': [kx, 0, 0]}, {'name': 'sideline_far', 'from': [bx, 6.1, 0], 'to': [kx, 6.1, 0]}, {'name': 'center_service', 'from': [bx, 3.05, 0], 'to': [kx, 3.05, 0]}, ] def _detect_green_mask(frame): """Detect green court surface pixels. Returns binary mask.""" hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) lower_green = np.array([30, 30, 50]) upper_green = np.array([90, 255, 220]) mask = cv2.inRange(hsv, lower_green, upper_green) kernel = np.ones((7, 7), np.uint8) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) return mask def _find_green_quad(green_mask, frame_area): """Find the boundary quadrilateral of the green court surface. Returns 4x2 float32 array ordered TL, TR, BR, BL, or None.""" contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None largest = max(contours, key=cv2.contourArea) area = cv2.contourArea(largest) if area < frame_area * 0.05: return None epsilon = 0.02 * cv2.arcLength(largest, True) approx = cv2.approxPolyDP(largest, epsilon, True) if len(approx) == 4: quad = approx.reshape(4, 2).astype(np.float32) else: rect = cv2.minAreaRect(largest) quad = cv2.boxPoints(rect).astype(np.float32) # Order: TL, TR, BR, BL sorted_by_y = quad[np.argsort(quad[:, 1])] top_pair = sorted_by_y[:2][np.argsort(sorted_by_y[:2, 0])] bottom_pair = sorted_by_y[2:][np.argsort(sorted_by_y[2:, 0])] return np.array([top_pair[0], top_pair[1], bottom_pair[1], bottom_pair[0]], dtype=np.float32) def _calibrate_from_quad(quad, side, w, h): """Try calibrating from green quad boundary by testing both left-right mappings. The quad is ordered TL, TR, BR, BL in image coordinates. Camera is at net looking at one half-court, so: - Top of image = far from camera = baseline - Bottom of image = near camera = kitchen/net area We don't know which sideline is left vs right, so try both. Returns (CameraCalibrator, cam_pos, info_string) or (None, None, None). """ corners_3d = get_half_court_3d_points(side) # corners_3d order: [baseline_near, baseline_far, net_far, net_near] # baseline_near = (bx, 0, 0), baseline_far = (bx, 6.1, 0) # net_far = (nx, 6.1, 0), net_near = (nx, 0, 0) # Mapping A: image-left = y=0 (near sideline), image-right = y=6.1 (far sideline) # quad: TL→baseline_near, TR→baseline_far, BR→net_far, BL→net_near mapping_a = corners_3d[[0, 1, 2, 3]] # Mapping B: image-left = y=6.1 (far sideline), image-right = y=0 (near sideline) # quad: TL→baseline_far, TR→baseline_near, BR→net_near, BL→net_far mapping_b = corners_3d[[1, 0, 3, 2]] best_cal = None best_pos = None best_label = None for label, mapping in [('A', mapping_a), ('B', mapping_b)]: cal = CameraCalibrator() try: cal.calibrate(quad, mapping, w, h) except RuntimeError: continue cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten() # Camera should be: above ground (z>0), reasonable height (z<5m) if 0 < cam_pos[2] < 5: if best_cal is None or abs(cam_pos[2] - 1.0) < abs(best_pos[2] - 1.0): best_cal = cal best_pos = cam_pos best_label = label if best_cal is None: return None, None, None return best_cal, best_pos, f"quad mapping {best_label}" def _detect_white_lines_on_court(frame, green_mask): """Detect white line segments within the green court area. Returns list of [x1, y1, x2, y2] segments.""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) _, white = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY) # Only keep white pixels within/near the court dilated_court = cv2.dilate(green_mask, np.ones((15, 15), np.uint8)) white_on_court = cv2.bitwise_and(white, dilated_court) lines = cv2.HoughLinesP(white_on_court, 1, np.pi / 180, threshold=40, minLineLength=50, maxLineGap=25) if lines is None: return [] return [line[0].tolist() for line in lines] def _merge_line_segments(segments, angle_thresh=15, dist_thresh=30): """Merge nearby parallel line segments into distinct court lines. Groups segments by angle and perpendicular distance, then represents each group by the longest extent along the dominant direction. Returns list of dicts with 'angle', 'p1', 'p2', 'weight'. """ if not segments: return [] line_data = [] for x1, y1, x2, y2 in segments: angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi if angle < 0: angle += 180 length = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) mid = ((x1 + x2) / 2, (y1 + y2) / 2) line_data.append({ 'angle': angle, 'mid': mid, 'seg': (x1, y1, x2, y2), 'length': length, }) used = [False] * len(line_data) merged = [] for i in range(len(line_data)): if used[i]: continue cluster = [line_data[i]] used[i] = True for j in range(i + 1, len(line_data)): if used[j]: continue da = abs(line_data[i]['angle'] - line_data[j]['angle']) if da > 90: da = 180 - da if da > angle_thresh: continue # Perpendicular distance between midpoints dx = line_data[j]['mid'][0] - line_data[i]['mid'][0] dy = line_data[j]['mid'][1] - line_data[i]['mid'][1] angle_rad = line_data[i]['angle'] * np.pi / 180 perp_dist = abs(dx * np.sin(angle_rad) - dy * np.cos(angle_rad)) if perp_dist < dist_thresh: cluster.append(line_data[j]) used[j] = True # Merge: use weighted average angle, find extent from all endpoints total_weight = sum(l['length'] for l in cluster) avg_angle = sum(l['angle'] * l['length'] for l in cluster) / total_weight angle_rad = avg_angle * np.pi / 180 dx, dy = np.cos(angle_rad), np.sin(angle_rad) all_pts = [] for l in cluster: sx1, sy1, sx2, sy2 = l['seg'] all_pts.extend([(sx1, sy1), (sx2, sy2)]) projections = [p[0] * dx + p[1] * dy for p in all_pts] min_idx = int(np.argmin(projections)) max_idx = int(np.argmax(projections)) merged.append({ 'angle': avg_angle, 'p1': all_pts[min_idx], 'p2': all_pts[max_idx], 'weight': total_weight, }) return merged def _find_line_intersections(merged_lines, img_w, img_h, margin=50): """Find intersection points between merged lines. Only keeps intersections where lines are sufficiently non-parallel and the point is within (or near) the image bounds. Clusters nearby intersections into single points. """ raw_intersections = [] for i in range(len(merged_lines)): for j in range(i + 1, len(merged_lines)): da = abs(merged_lines[i]['angle'] - merged_lines[j]['angle']) if da > 90: da = 180 - da if da < 20: # too parallel continue x1, y1 = merged_lines[i]['p1'] x2, y2 = merged_lines[i]['p2'] x3, y3 = merged_lines[j]['p1'] x4, y4 = merged_lines[j]['p2'] denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) if abs(denom) < 1e-6: continue t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom ix = x1 + t * (x2 - x1) iy = y1 + t * (y2 - y1) if -margin <= ix <= img_w + margin and -margin <= iy <= img_h + margin: raw_intersections.append((ix, iy)) # Cluster nearby intersections (within 20px) if not raw_intersections: return [] pts = np.array(raw_intersections, dtype=np.float32) used = [False] * len(pts) clustered = [] for i in range(len(pts)): if used[i]: continue cluster = [pts[i]] used[i] = True for j in range(i + 1, len(pts)): if used[j]: continue if np.linalg.norm(pts[i] - pts[j]) < 20: cluster.append(pts[j]) used[j] = True centroid = np.mean(cluster, axis=0) clustered.append((float(centroid[0]), float(centroid[1]))) return clustered def _match_intersections_to_template(intersections, template, side, quad, w, h): """Match detected intersections to known court template points. Strategy: 1. Use green quad to compute initial homography (world -> image) 2. Project all 6 template points into image 3. For each projected point, find nearest detected intersection 4. Try both left-right mirror mappings 5. Return the match with more inliers and sane reprojection Returns dict with 'image_points' and 'world_points' dicts, or None. """ if quad is None or len(intersections) < 4: return None corners_3d = get_half_court_3d_points(side) det_pts = np.array(intersections, dtype=np.float32) template_names = list(template.keys()) template_world = np.array([template[n][:2] for n in template_names], dtype=np.float32) best_match = None best_count = 0 # Try both left-right mappings for the initial homography for mapping_3d in [corners_3d[[0, 1, 2, 3]], corners_3d[[1, 0, 3, 2]]]: # Homography: world 2D -> image world_2d = mapping_3d[:, :2].astype(np.float32) H, status = cv2.findHomography(world_2d, quad) if H is None: continue # Project template intersection points to image projected = cv2.perspectiveTransform( template_world.reshape(-1, 1, 2), H ).reshape(-1, 2) # Match each projected point to nearest detected intersection threshold = 50 # pixels matched_img = {} matched_world = {} used_det = set() for i, name in enumerate(template_names): proj = projected[i] dists = np.linalg.norm(det_pts - proj, axis=1) order = np.argsort(dists) for idx in order: if dists[idx] > threshold: break if int(idx) not in used_det: matched_img[name] = (float(det_pts[idx][0]), float(det_pts[idx][1])) matched_world[name] = template[name].tolist() used_det.add(int(idx)) break if len(matched_img) > best_count: best_count = len(matched_img) best_match = { 'image_points': matched_img, 'world_points': matched_world, } return best_match if best_count >= 4 else None def _capture_var_snapshot(frame, event): """Create a snapshot for VAR event and store it in state.""" # Draw VAR overlay on frame snapshot = frame.copy() cv2.putText(snapshot, "VAR DETECT", (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm", (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) _, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85]) b64 = base64.b64encode(jpeg.tobytes()).decode('ascii') state['last_var'] = { 'event': event, 'snapshot_b64': b64, } def detection_loop(cam_readers, model, conf_threshold, ring_buffer): """Main detection loop: alternate cameras, run YOLO, update state.""" frame_counts = {sid: 0 for sid in cam_readers} start_times = {sid: time.time() for sid in cam_readers} trajectory = state['trajectory'] event_detector = state['event_detector'] calibrators = state['calibrators'] while True: for sensor_id, reader in cam_readers.items(): cam = state['cameras'][sensor_id] frame = reader.grab() if frame is None: continue now = time.time() # Store raw frame in ring buffer for VAR ring_buffer.push(frame, now, sensor_id) # Only run detection if camera is calibrated cal = calibrators.get(sensor_id) is_calibrated = cal and cal.calibrated det_count = 0 best_detection = None best_conf = 0 if is_calibrated: # YOLO detection — only after calibration results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold) for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = float(box.conf[0]) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3) label = f"Ball {conf:.0%}" (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1) cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2) det_count += 1 if conf > best_conf: best_conf = conf cx = (x1 + x2) / 2 cy = (y1 + y2) / 2 diameter = max(x2 - x1, y2 - y1) best_detection = (cx, cy, diameter, conf) # Update trajectory and check VAR if best_detection: px, py, diam, conf = best_detection pos_3d = cal.pixel_to_3d(px, py, diam) if pos_3d: trajectory.add_observation( pos_3d[0], pos_3d[1], pos_3d[2], now, frame_counts[sensor_id], sensor_id, conf ) # Check for close calls event = event_detector.check(trajectory) if event: print(f"[VAR] Close call! Line: {event['line']}, " f"Distance: {event['distance_m']*100:.0f}cm") state['events'].append(event) _capture_var_snapshot(frame, event) # FPS tracking frame_counts[sensor_id] += 1 elapsed = time.time() - start_times[sensor_id] fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0 # HUD overlay cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) if det_count > 0: cv2.putText(frame, f"Balls: {det_count}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) # Encode JPEG and update shared state _, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80]) with cam['lock']: cam['frame'] = jpeg.tobytes() cam['fps'] = fps_actual cam['detections'] = det_count if frame_counts[sensor_id] % 150 == 0: speed = trajectory.get_speed() speed_str = f"{speed:.1f} m/s" if speed else "N/A" print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, " f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}") def main(): global _cam_readers, _args parser = argparse.ArgumentParser(description='Pickle Vision Referee System') parser.add_argument('--width', type=int, default=1280) parser.add_argument('--height', type=int, default=720) parser.add_argument('--fps', type=int, default=30) parser.add_argument('--model', type=str, default='yolov8n.pt') parser.add_argument('--conf', type=float, default=0.25) parser.add_argument('--port', type=int, default=8080) parser.add_argument('--buffer-seconds', type=int, default=10, help='Ring buffer size in seconds for VAR clips') parser.add_argument('--calibration-dir', type=str, default=os.path.join(os.path.dirname(__file__), 'config'), help='Directory with calibration JSON files') args = parser.parse_args() _args = args # Load YOLO model print(f"Loading YOLO model: {args.model}") model = YOLO(args.model) try: model.to("cuda") print("Inference on CUDA") except Exception: print("CUDA unavailable, using CPU") # Initialize shared state state['trajectory'] = TrajectoryModel(fps=args.fps) state['event_detector'] = EventDetector(trigger_distance_m=0.3) state['calibration_dir'] = args.calibration_dir state['calibrate_fn'] = auto_calibrate ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps) # Start with empty calibrators — user must calibrate via UI os.makedirs(args.calibration_dir, exist_ok=True) for sensor_id in [0, 1]: state['calibrators'][sensor_id] = CameraCalibrator() # Start camera readers cam_readers = {} for sensor_id in [0, 1]: state['cameras'][sensor_id] = { 'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0 } cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps) _cam_readers = cam_readers # Wait for at least one camera print("Waiting for cameras...") for _ in range(100): if any(r.grab() is not None for r in cam_readers.values()): break time.sleep(0.1) # Start detection loop det_thread = threading.Thread( target=detection_loop, args=(cam_readers, model, args.conf, ring_buffer), daemon=True ) det_thread.start() time.sleep(2) print(f"\n{'=' * 50}") print(f" Pickle Vision Referee System") print(f" Open in browser: http://192.168.1.253:{args.port}") print(f"{'=' * 50}\n") app.run(host='0.0.0.0', port=args.port, threaded=True) if __name__ == '__main__': main()