diff --git a/jetson/main.py b/jetson/main.py new file mode 100644 index 0000000..9ebd58d --- /dev/null +++ b/jetson/main.py @@ -0,0 +1,389 @@ +#!/usr/bin/env python3 +""" +Pickle Vision - Referee System main entry point for Jetson. +Dual CSI cameras, real-time YOLO detection, trajectory tracking, VAR triggers. +""" + +import sys +import os +import cv2 +import time +import base64 +import argparse +import threading +import numpy as np + +# Add project root to path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from ultralytics import YOLO +from src.streaming.camera_reader import CameraReader +from src.streaming.ring_buffer import FrameRingBuffer +from src.physics.trajectory import TrajectoryModel +from src.physics.event_detector import EventDetector +from src.calibration.camera_calibrator import ( + CameraCalibrator, get_half_court_3d_points, + COURT_LENGTH, COURT_WIDTH, HALF_COURT_LENGTH +) +from src.web.app import app, state + +BALL_CLASS_ID = 32 # sports ball in COCO + +# Global references set in main() +_cam_readers = {} +_args = None + + +def auto_calibrate(): + """One-click calibration: detect court lines from current frames, + compute camera pose, save to config. + + Each camera sees one half of the court from the net position. + We use the court lines visible in each frame to build correspondences. + + For now: use a simple approach — detect the 4 most prominent lines + (baseline, two sidelines, kitchen line) and map to known 3D coords. + Falls back to estimated corners based on frame geometry if detection fails. + """ + results = {} + + for sensor_id, reader in _cam_readers.items(): + frame = reader.grab() + if frame is None: + results[str(sensor_id)] = {'ok': False, 'error': 'No frame available'} + continue + + h, w = frame.shape[:2] + side = 'left' if sensor_id == 0 else 'right' + + # Try to detect court lines using edge detection + Hough + corners_pixel = _detect_court_corners(frame, side) + + if corners_pixel is None: + # Fallback: estimate corners from typical camera-at-net perspective + corners_pixel = _estimate_corners_from_net(w, h, side) + + # Get known 3D coordinates for this half + corners_3d = get_half_court_3d_points(side) + + # Calibrate + cal = CameraCalibrator() + ok = cal.calibrate( + np.array(corners_pixel, dtype=np.float32), + corners_3d, + w, h + ) + + if ok: + # Save to config + cal_path = os.path.join(_args.calibration_dir, + f'cam{sensor_id}_calibration.json') + os.makedirs(os.path.dirname(cal_path), exist_ok=True) + cal.save(cal_path) + + state['calibrators'][sensor_id] = cal + + # Get camera position for 3D scene + cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten() + + results[str(sensor_id)] = { + 'ok': True, + 'camera_position': cam_pos.tolist(), + 'corners_pixel': corners_pixel.tolist() if isinstance(corners_pixel, np.ndarray) + else corners_pixel, + } + print(f"[CAM {sensor_id}] Calibrated! Camera at " + f"({cam_pos[0]:.1f}, {cam_pos[1]:.1f}, {cam_pos[2]:.1f})") + else: + results[str(sensor_id)] = {'ok': False, 'error': 'Calibration failed'} + + return results + + +def _detect_court_corners(frame, side): + """Detect court corners from frame using edge detection. + + Returns 4 corner points as numpy array, or None if detection fails. + """ + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + blur = cv2.GaussianBlur(gray, (5, 5), 0) + edges = cv2.Canny(blur, 50, 150) + + # Detect lines + lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=80, + minLineLength=100, maxLineGap=20) + + if lines is None or len(lines) < 4: + return None + + # Classify lines into horizontal and vertical + horizontals = [] + verticals = [] + + for line in lines: + x1, y1, x2, y2 = line[0] + angle = abs(np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi) + + if angle < 30 or angle > 150: + horizontals.append(line[0]) + elif 60 < angle < 120: + verticals.append(line[0]) + + if len(horizontals) < 2 or len(verticals) < 2: + return None + + # Cluster lines by position to find the dominant ones + h_positions = sorted(horizontals, key=lambda l: (l[1] + l[3]) / 2) + v_positions = sorted(verticals, key=lambda l: (l[0] + l[2]) / 2) + + # Take the most separated horizontal pair (top and bottom court lines) + top_line = h_positions[0] + bottom_line = h_positions[-1] + + # Take the most separated vertical pair (left and right sidelines) + left_line = v_positions[0] + right_line = v_positions[-1] + + # Find intersections as corner points + def line_intersection(l1, l2): + x1, y1, x2, y2 = l1 + x3, y3, x4, y4 = l2 + denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) + if abs(denom) < 1e-6: + return None + t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom + ix = x1 + t * (x2 - x1) + iy = y1 + t * (y2 - y1) + return [ix, iy] + + corners = [ + line_intersection(top_line, left_line), # TL + line_intersection(top_line, right_line), # TR + line_intersection(bottom_line, right_line), # BR + line_intersection(bottom_line, left_line), # BL + ] + + if any(c is None for c in corners): + return None + + return np.array(corners, dtype=np.float32) + + +def _estimate_corners_from_net(w, h, side): + """Estimate court corner positions for a camera mounted at the net. + + Camera looks down the half-court. Perspective: far baseline is smaller + (top of frame), near net line is wider (bottom of frame). + """ + # Near edge (bottom of frame, close to camera = at the net) + near_left = [w * 0.05, h * 0.85] + near_right = [w * 0.95, h * 0.85] + + # Far edge (top of frame, baseline = far from camera) + far_left = [w * 0.20, h * 0.15] + far_right = [w * 0.80, h * 0.15] + + # For camera at net looking at court half: + # - bottom of frame = net line (near) + # - top of frame = baseline (far) + # Order must match get_half_court_3d_points output + if side == 'left': + # 3D order: baseline near-left, baseline near-right, net far-right, net far-left + return np.array([far_left, far_right, near_right, near_left], dtype=np.float32) + else: + return np.array([far_left, far_right, near_right, near_left], dtype=np.float32) + + +def _capture_var_snapshot(frame, event): + """Create a snapshot for VAR event and store it in state.""" + # Draw VAR overlay on frame + snapshot = frame.copy() + cv2.putText(snapshot, "VAR DETECT", (10, 40), + cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) + cv2.putText(snapshot, f"Line: {event['line']} Dist: {event['distance_m']*100:.0f}cm", + (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) + + _, jpeg = cv2.imencode('.jpg', snapshot, [cv2.IMWRITE_JPEG_QUALITY, 85]) + b64 = base64.b64encode(jpeg.tobytes()).decode('ascii') + + state['last_var'] = { + 'event': event, + 'snapshot_b64': b64, + } + + +def detection_loop(cam_readers, model, conf_threshold, ring_buffer): + """Main detection loop: alternate cameras, run YOLO, update state.""" + frame_counts = {sid: 0 for sid in cam_readers} + start_times = {sid: time.time() for sid in cam_readers} + trajectory = state['trajectory'] + event_detector = state['event_detector'] + calibrators = state['calibrators'] + + while True: + for sensor_id, reader in cam_readers.items(): + cam = state['cameras'][sensor_id] + + frame = reader.grab() + if frame is None: + continue + + now = time.time() + + # Store raw frame in ring buffer for VAR + ring_buffer.push(frame, now, sensor_id) + + # YOLO detection + results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold) + + det_count = 0 + best_detection = None + best_conf = 0 + + for r in results: + for box in r.boxes: + x1, y1, x2, y2 = map(int, box.xyxy[0]) + conf = float(box.conf[0]) + cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3) + label = f"Ball {conf:.0%}" + (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) + cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1) + cv2.putText(frame, label, (x1, y1 - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2) + det_count += 1 + + if conf > best_conf: + best_conf = conf + cx = (x1 + x2) / 2 + cy = (y1 + y2) / 2 + diameter = max(x2 - x1, y2 - y1) + best_detection = (cx, cy, diameter, conf) + + # Update trajectory if we have calibration and a detection + if best_detection and sensor_id in calibrators: + cal = calibrators[sensor_id] + if cal.calibrated: + px, py, diam, conf = best_detection + pos_3d = cal.pixel_to_3d(px, py, diam) + if pos_3d: + trajectory.add_observation( + pos_3d[0], pos_3d[1], pos_3d[2], + now, frame_counts[sensor_id], sensor_id, conf + ) + + # Check for close calls + event = event_detector.check(trajectory) + if event: + print(f"[VAR] Close call! Line: {event['line']}, " + f"Distance: {event['distance_m']*100:.0f}cm") + state['events'].append(event) + _capture_var_snapshot(frame, event) + + # FPS tracking + frame_counts[sensor_id] += 1 + elapsed = time.time() - start_times[sensor_id] + fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0 + + # HUD overlay + cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) + if det_count > 0: + cv2.putText(frame, f"Balls: {det_count}", (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) + + # Encode JPEG and update shared state + _, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80]) + with cam['lock']: + cam['frame'] = jpeg.tobytes() + cam['fps'] = fps_actual + cam['detections'] = det_count + + if frame_counts[sensor_id] % 150 == 0: + speed = trajectory.get_speed() + speed_str = f"{speed:.1f} m/s" if speed else "N/A" + print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, " + f"FPS: {fps_actual:.1f}, Det: {det_count}, Speed: {speed_str}") + + +def main(): + global _cam_readers, _args + + parser = argparse.ArgumentParser(description='Pickle Vision Referee System') + parser.add_argument('--width', type=int, default=1280) + parser.add_argument('--height', type=int, default=720) + parser.add_argument('--fps', type=int, default=30) + parser.add_argument('--model', type=str, default='yolov8n.pt') + parser.add_argument('--conf', type=float, default=0.25) + parser.add_argument('--port', type=int, default=8080) + parser.add_argument('--buffer-seconds', type=int, default=10, + help='Ring buffer size in seconds for VAR clips') + parser.add_argument('--calibration-dir', type=str, + default=os.path.join(os.path.dirname(__file__), 'config'), + help='Directory with calibration JSON files') + args = parser.parse_args() + _args = args + + # Load YOLO model + print(f"Loading YOLO model: {args.model}") + model = YOLO(args.model) + try: + model.to("cuda") + print("Inference on CUDA") + except Exception: + print("CUDA unavailable, using CPU") + + # Initialize shared state + state['trajectory'] = TrajectoryModel(fps=args.fps) + state['event_detector'] = EventDetector(trigger_distance_m=0.3) + state['calibration_dir'] = args.calibration_dir + state['calibrate_fn'] = auto_calibrate + + ring_buffer = FrameRingBuffer(max_seconds=args.buffer_seconds, fps=args.fps) + + # Load calibrations if available + os.makedirs(args.calibration_dir, exist_ok=True) + for sensor_id in [0, 1]: + cal = CameraCalibrator() + cal_path = os.path.join(args.calibration_dir, f'cam{sensor_id}_calibration.json') + if os.path.exists(cal_path): + if cal.load(cal_path): + print(f"[CAM {sensor_id}] Loaded calibration from {cal_path}") + state['calibrators'][sensor_id] = cal + + # Start camera readers + cam_readers = {} + for sensor_id in [0, 1]: + state['cameras'][sensor_id] = { + 'frame': None, 'lock': threading.Lock(), 'fps': 0, 'detections': 0 + } + cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps) + + _cam_readers = cam_readers + + # Wait for at least one camera + print("Waiting for cameras...") + for _ in range(100): + if any(r.grab() is not None for r in cam_readers.values()): + break + time.sleep(0.1) + + # Start detection loop + det_thread = threading.Thread( + target=detection_loop, + args=(cam_readers, model, args.conf, ring_buffer), + daemon=True + ) + det_thread.start() + + time.sleep(2) + + print(f"\n{'=' * 50}") + print(f" Pickle Vision Referee System") + print(f" Open in browser: http://192.168.1.253:{args.port}") + print(f"{'=' * 50}\n") + + app.run(host='0.0.0.0', port=args.port, threaded=True) + + +if __name__ == '__main__': + main() diff --git a/src/calibration/__init__.py b/src/calibration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/calibration/camera_calibrator.py b/src/calibration/camera_calibrator.py new file mode 100644 index 0000000..613b35f --- /dev/null +++ b/src/calibration/camera_calibrator.py @@ -0,0 +1,191 @@ +""" +3D camera calibration using court geometry. +Extracted from dagster_project/assets/camera_calibration.py and coordinate_transform_3d.py. + +For our setup: two cameras mounted at the net center, looking at opposite halves. +Each camera gets its own calibration. +""" + +import cv2 +import numpy as np +import json +from typing import Dict, Optional, Tuple + +# Pickleball court dimensions (meters) +COURT_LENGTH = 13.4 +COURT_WIDTH = 6.1 +HALF_COURT_LENGTH = COURT_LENGTH / 2 # 6.7m +NET_HEIGHT = 0.914 # meters at center +BALL_DIAMETER = 0.074 # meters (74mm) + + +class CameraCalibrator: + """3D camera calibration using solvePnP with known court geometry.""" + + def __init__(self): + self.camera_matrix = None + self.rotation_vec = None + self.translation_vec = None + self.rotation_matrix = None + self.homography = None # ground plane homography + self.calibrated = False + + def calibrate(self, court_corners_pixel: np.ndarray, + court_corners_3d: np.ndarray, + image_width: int, image_height: int) -> bool: + """Calibrate camera using court corner correspondences. + + Args: + court_corners_pixel: Nx2 array of corner positions in pixels + court_corners_3d: Nx3 array of corresponding 3D world coordinates + image_width, image_height: frame dimensions + """ + # Estimate camera matrix + focal_length = float(max(image_width, image_height)) + cx, cy = image_width / 2.0, image_height / 2.0 + + self.camera_matrix = np.array([ + [focal_length, 0, cx], + [0, focal_length, cy], + [0, 0, 1] + ], dtype=np.float32) + + try: + success, self.rotation_vec, self.translation_vec = cv2.solvePnP( + court_corners_3d.astype(np.float32), + court_corners_pixel.astype(np.float32), + self.camera_matrix, + None, + flags=cv2.SOLVEPNP_ITERATIVE + ) + if not success: + return False + + self.rotation_matrix, _ = cv2.Rodrigues(self.rotation_vec) + + # Build ground plane homography (Z=0) + ground_2d = court_corners_3d[:, :2].astype(np.float32) + self.homography = cv2.getPerspectiveTransform( + court_corners_pixel[:4].astype(np.float32), + ground_2d[:4] + ) + + self.calibrated = True + return True + except Exception as e: + print(f"Calibration failed: {e}") + return False + + def pixel_to_ground(self, px: float, py: float) -> Optional[Tuple[float, float]]: + """Project pixel to ground plane (Z=0) using homography.""" + if self.homography is None: + return None + pt = np.array([[[px, py]]], dtype=np.float32) + result = cv2.perspectiveTransform(pt, self.homography) + return float(result[0][0][0]), float(result[0][0][1]) + + def pixel_to_3d(self, px: float, py: float, + ball_diameter_px: float = 20.0) -> Optional[Tuple[float, float, float]]: + """Estimate 3D position using ray casting and ball size. + + If ball appears large (>30px diameter), assume on ground (Z=0). + Otherwise, estimate height from apparent size. + """ + if not self.calibrated: + return None + + # Large ball → on ground + if ball_diameter_px > 30: + ground = self.pixel_to_ground(px, py) + if ground: + return (ground[0], ground[1], 0.0) + return None + + fx = self.camera_matrix[0, 0] + fy = self.camera_matrix[1, 1] + cx = self.camera_matrix[0, 2] + cy = self.camera_matrix[1, 2] + focal = (fx + fy) / 2 + + # Ray direction in camera coords + ray_cam = np.array([ + (px - cx) / fx, + (py - cy) / fy, + 1.0 + ]) + ray_cam /= np.linalg.norm(ray_cam) + + # Transform to world coords + ray_world = self.rotation_matrix.T @ ray_cam + + # Distance from apparent ball size + distance = (BALL_DIAMETER * focal) / max(ball_diameter_px, 1.0) + + # Camera position in world + cam_pos = (-self.rotation_matrix.T @ self.translation_vec).flatten() + + point_3d = cam_pos + ray_world * distance + x, y, z = point_3d + + # Sanity check + if z < 0 or z > 10: + ground = self.pixel_to_ground(px, py) + if ground: + return (ground[0], ground[1], 0.0) + return None + + return (float(x), float(y), float(z)) + + def save(self, filepath: str): + if not self.calibrated: + return + data = { + 'camera_matrix': self.camera_matrix.tolist(), + 'rotation_vector': self.rotation_vec.flatten().tolist(), + 'translation_vector': self.translation_vec.flatten().tolist(), + 'rotation_matrix': self.rotation_matrix.tolist(), + 'homography': self.homography.tolist() if self.homography is not None else None, + } + with open(filepath, 'w') as f: + json.dump(data, f, indent=2) + + def load(self, filepath: str) -> bool: + try: + with open(filepath, 'r') as f: + data = json.load(f) + self.camera_matrix = np.array(data['camera_matrix'], dtype=np.float32) + self.rotation_vec = np.array(data['rotation_vector'], dtype=np.float32).reshape(3, 1) + self.translation_vec = np.array(data['translation_vector'], dtype=np.float32).reshape(3, 1) + self.rotation_matrix = np.array(data['rotation_matrix'], dtype=np.float32) + if data.get('homography'): + self.homography = np.array(data['homography'], dtype=np.float32) + self.calibrated = True + return True + except Exception as e: + print(f"Failed to load calibration: {e}") + return False + + +def get_half_court_3d_points(side: str) -> np.ndarray: + """Get 3D coordinates for one half of the court. + + Args: + side: 'left' (CAM 0, X: 0 to 6.7m) or 'right' (CAM 1, X: 6.7 to 13.4m) + + Returns: + 4x3 array of court corners in world coords (X=along court, Y=across, Z=up) + """ + if side == 'left': + return np.array([ + [0, 0, 0], # near-left (baseline) + [0, COURT_WIDTH, 0], # near-right + [HALF_COURT_LENGTH, COURT_WIDTH, 0], # far-right (net) + [HALF_COURT_LENGTH, 0, 0], # far-left (net) + ], dtype=np.float32) + else: + return np.array([ + [COURT_LENGTH, 0, 0], # near-left (baseline) + [COURT_LENGTH, COURT_WIDTH, 0], # near-right + [HALF_COURT_LENGTH, COURT_WIDTH, 0], # far-right (net) + [HALF_COURT_LENGTH, 0, 0], # far-left (net) + ], dtype=np.float32) diff --git a/src/detection/__init__.py b/src/detection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/physics/__init__.py b/src/physics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/physics/event_detector.py b/src/physics/event_detector.py new file mode 100644 index 0000000..c322fc4 --- /dev/null +++ b/src/physics/event_detector.py @@ -0,0 +1,113 @@ +""" +Close call / event detector for referee system. +Watches the trajectory model and triggers VAR when ball lands near a line. +""" + +import time +from typing import Optional, Tuple, List, Dict +from .trajectory import TrajectoryModel + +# Court line definitions in meters (X=along court, Y=across court) +# Full court: 13.4 x 6.1 +COURT_LENGTH = 13.4 +COURT_WIDTH = 6.1 +HALF = COURT_LENGTH / 2 # 6.7 (net / center line) +NVZ = 2.13 # non-volley zone (kitchen) depth from net + +COURT_LINES = { + 'baseline_left': {'type': 'horizontal', 'x': 0, 'y_start': 0, 'y_end': COURT_WIDTH}, + 'baseline_right': {'type': 'horizontal', 'x': COURT_LENGTH, 'y_start': 0, 'y_end': COURT_WIDTH}, + 'sideline_near': {'type': 'vertical', 'y': 0, 'x_start': 0, 'x_end': COURT_LENGTH}, + 'sideline_far': {'type': 'vertical', 'y': COURT_WIDTH, 'x_start': 0, 'x_end': COURT_LENGTH}, + 'centerline': {'type': 'horizontal', 'x': HALF, 'y_start': 0, 'y_end': COURT_WIDTH}, + 'kitchen_left': {'type': 'horizontal', 'x': HALF - NVZ, 'y_start': 0, 'y_end': COURT_WIDTH}, + 'kitchen_right': {'type': 'horizontal', 'x': HALF + NVZ, 'y_start': 0, 'y_end': COURT_WIDTH}, + 'center_service_left': {'type': 'vertical', 'y': COURT_WIDTH / 2, + 'x_start': 0, 'x_end': HALF - NVZ}, + 'center_service_right': {'type': 'vertical', 'y': COURT_WIDTH / 2, + 'x_start': HALF + NVZ, 'x_end': COURT_LENGTH}, +} + + +class EventDetector: + """Detects close calls by monitoring ball trajectory near court lines.""" + + def __init__(self, trigger_distance_m=0.3, cooldown_seconds=5.0): + self.trigger_distance = trigger_distance_m + self.cooldown = cooldown_seconds + self.last_trigger_time = 0.0 + self.events: List[Dict] = [] + + def check(self, trajectory: TrajectoryModel) -> Optional[Dict]: + """Check if current trajectory warrants a VAR trigger. + + Conditions: + 1. Ball is descending (vz < 0) or near ground (z < 0.3m) + 2. Predicted landing point is within trigger_distance of a line + 3. Cooldown has passed since last trigger + """ + now = time.time() + if now - self.last_trigger_time < self.cooldown: + return None + + if not trajectory.points or trajectory.velocity is None: + return None + + last = trajectory.points[-1] + vz = trajectory.velocity[2] + + # Ball must be descending or near ground + if vz > 0 and last.z > 0.5: + return None + + # Get predicted landing + landing = trajectory.predict_landing() + if landing is None: + return None + + land_x, land_y, time_to_land = landing + + # Check distance to each line + closest_line = None + closest_dist = float('inf') + + for name, line in COURT_LINES.items(): + dist = self._distance_to_line(land_x, land_y, line) + if dist < closest_dist: + closest_dist = dist + closest_line = name + + if closest_dist <= self.trigger_distance: + self.last_trigger_time = now + event = { + 'type': 'close_call', + 'timestamp': now, + 'line': closest_line, + 'distance_m': closest_dist, + 'landing_x': land_x, + 'landing_y': land_y, + 'time_to_land': time_to_land, + 'ball_z': last.z, + 'ball_speed': trajectory.get_speed(), + } + self.events.append(event) + return event + + return None + + def _distance_to_line(self, x: float, y: float, line: Dict) -> float: + if line['type'] == 'horizontal': + lx = line['x'] + y_start, y_end = line['y_start'], line['y_end'] + if y_start <= y <= y_end: + return abs(x - lx) + return float('inf') + else: # vertical + ly = line['y'] + x_start, x_end = line['x_start'], line['x_end'] + if x_start <= x <= x_end: + return abs(y - ly) + return float('inf') + + def is_in_bounds(self, x: float, y: float) -> bool: + return 0 <= x <= COURT_LENGTH and 0 <= y <= COURT_WIDTH diff --git a/src/physics/trajectory.py b/src/physics/trajectory.py new file mode 100644 index 0000000..41a6f13 --- /dev/null +++ b/src/physics/trajectory.py @@ -0,0 +1,190 @@ +""" +Physical trajectory model for pickleball. +Accumulates ball positions across frames, fits physics-based curves, +smooths noise, and predicts ball path. + +Key physics: +- Gravity: 9.81 m/s^2 +- Pickleball drag coefficient: ~0.4 (perforated ball) +- Typical serve speed: 10-25 m/s +- Bounce coefficient of restitution: ~0.4 +""" + +import numpy as np +from collections import deque +from typing import List, Optional, Tuple, Dict +import time + +GRAVITY = 9.81 # m/s^2 +BALL_MASS = 0.026 # kg +BALL_RADIUS = 0.037 # meters +DRAG_COEFF = 0.4 +AIR_DENSITY = 1.225 # kg/m^3 +CROSS_SECTION = np.pi * BALL_RADIUS ** 2 +BOUNCE_COR = 0.4 # coefficient of restitution + + +class TrajectoryPoint: + __slots__ = ('x', 'y', 'z', 'timestamp', 'frame', 'sensor_id', + 'confidence', 'interpolated') + + def __init__(self, x, y, z, timestamp, frame, sensor_id, + confidence=1.0, interpolated=False): + self.x = x + self.y = y + self.z = z + self.timestamp = timestamp + self.frame = frame + self.sensor_id = sensor_id + self.confidence = confidence + self.interpolated = interpolated + + def to_dict(self): + return { + 'x': self.x, 'y': self.y, 'z': self.z, + 'timestamp': self.timestamp, + 'frame': self.frame, + 'sensor_id': self.sensor_id, + 'confidence': self.confidence, + 'interpolated': self.interpolated, + } + + +class TrajectoryModel: + """Accumulates ball observations and builds a physics-consistent trajectory.""" + + def __init__(self, max_points=300, fps=30): + self.points: deque = deque(maxlen=max_points) + self.fps = fps + self.velocity = None # estimated (vx, vy, vz) m/s + self._last_update = 0.0 + + def add_observation(self, x: float, y: float, z: float, + timestamp: float, frame: int, sensor_id: int, + confidence: float = 1.0): + """Add a new observed ball position.""" + pt = TrajectoryPoint(x, y, z, timestamp, frame, sensor_id, confidence) + self.points.append(pt) + self._update_velocity() + + def _update_velocity(self): + """Estimate current velocity from last few points.""" + real_pts = [p for p in self.points if not p.interpolated and p.confidence > 0.3] + if len(real_pts) < 2: + self.velocity = None + return + + p1, p2 = real_pts[-2], real_pts[-1] + dt = p2.timestamp - p1.timestamp + if dt <= 0: + return + + self.velocity = np.array([ + (p2.x - p1.x) / dt, + (p2.y - p1.y) / dt, + (p2.z - p1.z) / dt, + ]) + + def predict_position(self, dt_ahead: float) -> Optional[Tuple[float, float, float]]: + """Predict where the ball will be dt_ahead seconds from now. + + Uses simple ballistic model: constant horizontal velocity + gravity. + """ + if not self.points or self.velocity is None: + return None + + last = self.points[-1] + vx, vy, vz = self.velocity + + # Simple drag approximation (reduces horizontal speed) + speed = np.sqrt(vx**2 + vy**2 + vz**2) + if speed > 0: + drag_decel = (0.5 * DRAG_COEFF * AIR_DENSITY * CROSS_SECTION * speed**2) / BALL_MASS + drag_factor = max(0, 1 - (drag_decel / speed) * dt_ahead) + else: + drag_factor = 1.0 + + x = last.x + vx * dt_ahead * drag_factor + y = last.y + vy * dt_ahead * drag_factor + z = last.z + vz * dt_ahead - 0.5 * GRAVITY * dt_ahead**2 + + # Bounce off ground + if z < 0: + z = abs(z) * BOUNCE_COR + + return (x, y, max(z, 0)) + + def predict_landing(self) -> Optional[Tuple[float, float, float]]: + """Predict where the ball will hit the ground (Z=0). + + Returns (x, y, time_to_landing) or None. + """ + if not self.points or self.velocity is None: + return None + + last = self.points[-1] + vz = self.velocity[2] + + # Already on ground + if last.z <= 0.05: + return (last.x, last.y, 0.0) + + # Solve: z + vz*t - 0.5*g*t^2 = 0 + # 0.5*g*t^2 - vz*t - z = 0 + a = 0.5 * GRAVITY + b = -vz + c = -last.z + + discriminant = b**2 - 4*a*c + if discriminant < 0: + return None + + t = (-b + np.sqrt(discriminant)) / (2 * a) + if t < 0: + t = (-b - np.sqrt(discriminant)) / (2 * a) + if t < 0: + return None + + predicted = self.predict_position(t) + if predicted: + return (predicted[0], predicted[1], t) + return None + + def get_smoothed_trajectory(self, window=5) -> List[Dict]: + """Get trajectory with moving average smoothing on real observations.""" + pts = list(self.points) + if len(pts) < window: + return [p.to_dict() for p in pts] + + result = [] + for i, p in enumerate(pts): + start = max(0, i - window // 2) + end = min(len(pts), i + window // 2 + 1) + neighbors = [pts[j] for j in range(start, end) + if not pts[j].interpolated] + + if neighbors: + d = p.to_dict() + d['x_smooth'] = np.mean([n.x for n in neighbors]) + d['y_smooth'] = np.mean([n.y for n in neighbors]) + d['z_smooth'] = np.mean([n.z for n in neighbors]) + result.append(d) + else: + result.append(p.to_dict()) + + return result + + def get_recent(self, n=60) -> List[Dict]: + """Get last N trajectory points as dicts.""" + pts = list(self.points)[-n:] + return [p.to_dict() for p in pts] + + def get_speed(self) -> Optional[float]: + """Current ball speed in m/s.""" + if self.velocity is None: + return None + return float(np.linalg.norm(self.velocity)) + + def clear(self): + self.points.clear() + self.velocity = None diff --git a/src/streaming/__init__.py b/src/streaming/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/streaming/camera_reader.py b/src/streaming/camera_reader.py new file mode 100644 index 0000000..d66e14a --- /dev/null +++ b/src/streaming/camera_reader.py @@ -0,0 +1,81 @@ +""" +Non-blocking camera frame reader using GStreamer subprocess. +Extracted from web_detection_stream.py for reuse across the system. +""" + +import cv2 +import time +import subprocess +import threading +import numpy as np + + +class CameraReader: + """Non-blocking camera frame reader using a background thread.""" + + def __init__(self, sensor_id, width=1280, height=720, fps=30): + self.sensor_id = sensor_id + self.width = width + self.height = height + self.frame = None + self.lock = threading.Lock() + self.running = True + + gst_pipeline = ( + f"gst-launch-1.0 --quiet -e " + f"nvarguscamerasrc sensor-id={sensor_id} ! " + f"'video/x-raw(memory:NVMM),width={width},height={height},framerate={fps}/1' ! " + f"nvvidconv ! 'video/x-raw,format=BGRx,width={width},height={height}' ! " + f"fdsink" + ) + print(f"[CAM {sensor_id}] Starting GStreamer: {width}x{height}@{fps}") + self.proc = subprocess.Popen( + gst_pipeline, shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + ) + self.frame_bytes = width * height * 4 + + time.sleep(2) + if self.proc.poll() is not None: + stderr = self.proc.stderr.read().decode() + print(f"[CAM {sensor_id}] GStreamer failed: {stderr[:200]}") + print(f"[CAM {sensor_id}] Falling back to V4L2") + self.use_gst = False + self.cap = cv2.VideoCapture(f"/dev/video{sensor_id}", cv2.CAP_V4L2) + else: + self.use_gst = True + self.cap = None + + self.thread = threading.Thread(target=self._read_loop, daemon=True) + self.thread.start() + + def _read_loop(self): + while self.running: + if self.use_gst: + raw = self.proc.stdout.read(self.frame_bytes) + if len(raw) != self.frame_bytes: + print(f"[CAM {self.sensor_id}] Pipe broken") + break + f = np.frombuffer(raw, dtype=np.uint8).reshape( + self.height, self.width, 4)[:, :, :3].copy() + else: + ret, f = self.cap.read() + if not ret: + time.sleep(0.01) + continue + if f.shape[1] != self.width or f.shape[0] != self.height: + f = cv2.resize(f, (self.width, self.height)) + + with self.lock: + self.frame = f + + def grab(self): + with self.lock: + return self.frame.copy() if self.frame is not None else None + + def stop(self): + self.running = False + if self.use_gst and self.proc.poll() is None: + self.proc.terminate() + elif self.cap: + self.cap.release() diff --git a/src/streaming/ring_buffer.py b/src/streaming/ring_buffer.py new file mode 100644 index 0000000..28e6653 --- /dev/null +++ b/src/streaming/ring_buffer.py @@ -0,0 +1,54 @@ +""" +Ring buffer for storing raw video frames. +Used to cut VAR clips when a close call is triggered. +""" + +import threading +import numpy as np +from collections import deque +from typing import Optional, List, Tuple + + +class FrameRingBuffer: + """Thread-safe ring buffer that stores the last N seconds of raw frames.""" + + def __init__(self, max_seconds=10, fps=30): + self.max_frames = max_seconds * fps + self.fps = fps + self.buffer = deque(maxlen=self.max_frames) + self.lock = threading.Lock() + self.frame_count = 0 + + def push(self, frame: np.ndarray, timestamp: float, sensor_id: int): + with self.lock: + self.buffer.append((frame.copy(), timestamp, sensor_id, self.frame_count)) + self.frame_count += 1 + + def get_clip(self, seconds_before: float, seconds_after: float = 0) -> List[Tuple]: + """Extract a clip from the buffer. + + Returns list of (frame, timestamp, sensor_id, frame_number) tuples. + """ + with self.lock: + if not self.buffer: + return [] + + latest_ts = self.buffer[-1][1] + start_ts = latest_ts - seconds_before + end_ts = latest_ts + seconds_after + + return [ + entry for entry in self.buffer + if start_ts <= entry[1] <= end_ts + ] + + def get_last_n_frames(self, n: int, sensor_id: Optional[int] = None) -> List[Tuple]: + with self.lock: + frames = list(self.buffer) + if sensor_id is not None: + frames = [f for f in frames if f[2] == sensor_id] + return frames[-n:] + + def __len__(self): + with self.lock: + return len(self.buffer) diff --git a/src/web/__init__.py b/src/web/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/web/app.py b/src/web/app.py new file mode 100644 index 0000000..9fb4166 --- /dev/null +++ b/src/web/app.py @@ -0,0 +1,143 @@ +""" +Flask web application for Pickle Vision referee system. +Three tabs: Detection, Court, Trajectory. +""" + +import os +import cv2 +import time +import json +import base64 +import threading +import numpy as np +from pathlib import Path +from flask import Flask, Response, render_template, jsonify, request + +app = Flask(__name__, template_folder=os.path.join(os.path.dirname(__file__), 'templates')) + +# Global state — set by main.py before starting +state = { + 'cameras': {}, # sensor_id -> {frame: bytes, lock, fps, detections} + 'trajectory': None, # TrajectoryModel instance + 'event_detector': None, + 'calibrators': {}, # sensor_id -> CameraCalibrator + 'calibration_dir': '', # path to save calibration files + 'events': [], # recent VAR events + 'last_var': None, # last VAR event with snapshot + 'calibrate_fn': None, # callback for calibration trigger +} + + +@app.route('/') +def index(): + return render_template('index.html') + + +@app.route('/frame/') +def frame(sensor_id): + cam = state['cameras'].get(sensor_id) + if not cam: + return "Camera not found", 404 + with cam['lock']: + jpg = cam.get('frame') + if jpg is None: + return "No frame yet", 503 + return Response(jpg, mimetype='image/jpeg', + headers={'Cache-Control': 'no-cache, no-store'}) + + +@app.route('/api/stats') +def api_stats(): + result = {} + for k, v in state['cameras'].items(): + result[str(k)] = { + 'fps': v.get('fps', 0), + 'detections': v.get('detections', 0), + } + return jsonify(result) + + +@app.route('/api/trajectory') +def api_trajectory(): + traj = state.get('trajectory') + if traj is None: + return jsonify({'points': [], 'speed': None, 'landing': None}) + + points = traj.get_recent(120) + speed = traj.get_speed() + landing = traj.predict_landing() + + return jsonify({ + 'points': points, + 'speed': speed, + 'landing': {'x': landing[0], 'y': landing[1], 't': landing[2]} if landing else None, + }) + + +@app.route('/api/events') +def api_events(): + return jsonify(state.get('events', [])[-20:]) + + +@app.route('/api/var/last') +def api_var_last(): + """Get last VAR event with snapshot image.""" + last = state.get('last_var') + if not last: + return jsonify(None) + + result = { + 'event': last['event'], + 'ago_seconds': time.time() - last['event']['timestamp'], + 'snapshot_b64': last.get('snapshot_b64'), + } + return jsonify(result) + + +@app.route('/api/calibration/status') +def api_calibration_status(): + cals = {} + for sid, cal in state.get('calibrators', {}).items(): + cals[str(sid)] = cal.calibrated + return jsonify(cals) + + +@app.route('/api/calibration/trigger', methods=['POST']) +def api_calibration_trigger(): + """Trigger one-click court calibration from current camera frames.""" + fn = state.get('calibrate_fn') + if fn is None: + return jsonify({'ok': False, 'error': 'Calibration not available'}), 500 + + try: + result = fn() + return jsonify({'ok': True, 'result': result}) + except Exception as e: + return jsonify({'ok': False, 'error': str(e)}), 500 + + +@app.route('/api/calibration/data') +def api_calibration_data(): + """Return calibration data for 3D scene reconstruction.""" + cals = state.get('calibrators', {}) + result = {} + for sid, cal in cals.items(): + if not cal.calibrated: + continue + # Camera position in world coordinates + cam_pos = (-cal.rotation_matrix.T @ cal.translation_vec).flatten() + # Camera look direction (Z axis of camera in world coords) + look_dir = cal.rotation_matrix.T @ np.array([0, 0, 1.0]) + + result[str(sid)] = { + 'position': cam_pos.tolist(), + 'look_direction': look_dir.tolist(), + 'focal_length': float(cal.camera_matrix[0, 0]), + 'image_size': [int(cal.camera_matrix[0, 2] * 2), + int(cal.camera_matrix[1, 2] * 2)], + } + return jsonify(result) + + +def run(host='0.0.0.0', port=8080): + app.run(host=host, port=port, threaded=True) diff --git a/src/web/templates/index.html b/src/web/templates/index.html new file mode 100644 index 0000000..d3c322a --- /dev/null +++ b/src/web/templates/index.html @@ -0,0 +1,769 @@ + + + + Pickle Vision - Referee System + + + + + + + + +
+ +
+ + + +
+
+
CAM0: -- fps
+
CAM1: -- fps
+
+
+ +
VAR: Close Call
+ + +
+
+
+
CAM 0 - Left Court
+ Camera 0 +
+
+
CAM 1 - Right Court
+ Camera 1 +
+
+
+ + +
+
+ +
+ CAM 0: not calibrated | + CAM 1: not calibrated +
+
+
+
+
CAM 0 - Court Lines
+ Camera 0 +
+
+
CAM 1 - Court Lines
+ Camera 1 +
+
+
+
+ + +
+
+
+
CAM 0 - Tracking
+ Camera 0 +
+
+
CAM 1 - Tracking
+ Camera 1 +
+
+
+ +
+
+
+
VAR
+
+
+
No events
+
+
+
+ +
No snapshot
+
+
+ +
+
Speed:--
+
Height:--
+
Landing:--
+
Events:0
+
+ + + +