#!/usr/bin/env python3 """ Web-based real-time ball detection stream for Jetson Orin Nano. Dual CSI cameras via GStreamer nvarguscamerasrc → hardware ISP → YOLO → Web. Single detection loop alternates between cameras to avoid GIL issues. Usage: python3 web_detection_stream.py [--port 8080] """ import cv2 import time import argparse import threading import subprocess import numpy as np from flask import Flask, Response, render_template_string, jsonify from ultralytics import YOLO BALL_CLASS_ID = 32 # sports ball in COCO app = Flask(__name__) # Per-camera shared state cameras = {} HTML_PAGE = """ Pickle Vision - Live Detection

🏓 Pickle Vision

Dual camera real-time ball detection — Jetson Orin Nano Super
CAM 0
Camera 0
CAM 1
Camera 1
FPS: --
Model: YOLOv8n CUDA
Cameras: 2x CSI IMX219
""" class CameraReader: """Non-blocking camera frame reader using a background thread.""" def __init__(self, sensor_id, width, height, fps): self.sensor_id = sensor_id self.width = width self.height = height self.frame = None self.lock = threading.Lock() self.running = True gst_pipeline = ( f"gst-launch-1.0 --quiet -e " f"nvarguscamerasrc sensor-id={sensor_id} ! " f"'video/x-raw(memory:NVMM),width={width},height={height},framerate={fps}/1' ! " f"nvvidconv ! 'video/x-raw,format=BGRx,width={width},height={height}' ! " f"fdsink" ) print(f"[CAM {sensor_id}] Starting GStreamer: {width}x{height}@{fps}") self.proc = subprocess.Popen( gst_pipeline, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) self.frame_bytes = width * height * 4 time.sleep(2) if self.proc.poll() is not None: stderr = self.proc.stderr.read().decode() print(f"[CAM {sensor_id}] GStreamer failed: {stderr[:200]}") print(f"[CAM {sensor_id}] Falling back to V4L2") self.use_gst = False self.cap = cv2.VideoCapture(f"/dev/video{sensor_id}", cv2.CAP_V4L2) else: self.use_gst = True self.cap = None # Start reader thread self.thread = threading.Thread(target=self._read_loop, daemon=True) self.thread.start() def _read_loop(self): """Continuously read frames into self.frame.""" while self.running: if self.use_gst: raw = self.proc.stdout.read(self.frame_bytes) if len(raw) != self.frame_bytes: print(f"[CAM {self.sensor_id}] Pipe broken") break f = np.frombuffer(raw, dtype=np.uint8).reshape( self.height, self.width, 4)[:, :, :3].copy() else: ret, f = self.cap.read() if not ret: time.sleep(0.01) continue if f.shape[1] != self.width or f.shape[0] != self.height: f = cv2.resize(f, (self.width, self.height)) with self.lock: self.frame = f def grab(self): """Get latest frame (non-blocking).""" with self.lock: return self.frame.copy() if self.frame is not None else None def detection_loop(cam_readers, model, conf_threshold): """Single loop: alternate cameras, run YOLO, update JPEGs.""" frame_counts = {sid: 0 for sid in cam_readers} start_times = {sid: time.time() for sid in cam_readers} while True: for sensor_id, reader in cam_readers.items(): cam = cameras[sensor_id] frame = reader.grab() if frame is None: continue results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=conf_threshold) det_count = 0 for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = float(box.conf[0]) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3) label = f"Ball {conf:.0%}" (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) cv2.rectangle(frame, (x1, y1 - th - 10), (x1 + tw, y1), (0, 255, 0), -1) cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2) det_count += 1 frame_counts[sensor_id] += 1 elapsed = time.time() - start_times[sensor_id] fps_actual = frame_counts[sensor_id] / elapsed if elapsed > 0 else 0 cv2.putText(frame, f"CAM {sensor_id} | FPS: {fps_actual:.1f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) if det_count > 0: cv2.putText(frame, f"Balls: {det_count}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) _, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80]) with cam["lock"]: cam["frame"] = jpeg.tobytes() cam["fps"] = fps_actual cam["detections"] = det_count if frame_counts[sensor_id] % 150 == 0: print(f"[CAM {sensor_id}] Frame {frame_counts[sensor_id]}, " f"FPS: {fps_actual:.1f}, Det: {det_count}") @app.route('/') def index(): return render_template_string(HTML_PAGE) @app.route('/frame/') def frame(sensor_id): if sensor_id not in cameras: return "Camera not found", 404 cam = cameras[sensor_id] with cam["lock"]: jpg = cam["frame"] if jpg is None: return "No frame yet", 503 return Response(jpg, mimetype='image/jpeg', headers={'Cache-Control': 'no-cache, no-store'}) @app.route('/api/stats') def api_stats(): return { str(k): {"fps": v["fps"], "detections": v["detections"]} for k, v in cameras.items() } def main(): parser = argparse.ArgumentParser(description='Pickle Vision Dual Camera Detection') parser.add_argument('--width', type=int, default=1280) parser.add_argument('--height', type=int, default=720) parser.add_argument('--fps', type=int, default=30) parser.add_argument('--model', type=str, default='yolov8n.pt') parser.add_argument('--conf', type=float, default=0.25) parser.add_argument('--port', type=int, default=8080) args = parser.parse_args() print(f"Loading YOLO model: {args.model}") model = YOLO(args.model) try: model.to("cuda") print("Inference on CUDA") except Exception: print("CUDA unavailable, using CPU") # Start camera readers cam_readers = {} for sensor_id in [0, 1]: cameras[sensor_id] = { "frame": None, "lock": threading.Lock(), "fps": 0, "detections": 0 } cam_readers[sensor_id] = CameraReader(sensor_id, args.width, args.height, args.fps) # Wait for at least one camera print("Waiting for cameras...") for _ in range(100): if any(r.grab() is not None for r in cam_readers.values()): break time.sleep(0.1) # Start detection loop in background det_thread = threading.Thread( target=detection_loop, args=(cam_readers, model, args.conf), daemon=True ) det_thread.start() # Wait for first detected frames time.sleep(2) print(f"\n{'=' * 50}") print(f" Open in browser: http://192.168.1.253:{args.port}") print(f"{'=' * 50}\n") app.run(host='0.0.0.0', port=args.port, threaded=True) if __name__ == '__main__': main()