#!/usr/bin/env python3 """ RTSP stream with YOLOv8 ball detection using GStreamer pipeline. """ import cv2 import time import sys from ultralytics import YOLO BALL_CLASS_ID = 32 # sports ball WIDTH = 1280 HEIGHT = 720 FPS = 25 PORT = 8554 def main(): source = sys.argv[1] if len(sys.argv) > 1 else "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4" print("Loading YOLOv8n model...") model = YOLO("yolov8n.pt") try: model.to("cuda") print("Using CUDA") except: print("Using CPU") print(f"Opening: {source}") cap = cv2.VideoCapture(source) if not cap.isOpened(): print("ERROR: Cannot open source") return 1 # GStreamer RTSP output pipeline gst_out = ( f"appsrc ! " f"video/x-raw,format=BGR,width={WIDTH},height={HEIGHT},framerate={FPS}/1 ! " f"queue ! videoconvert ! video/x-raw,format=I420 ! " f"x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast key-int-max=30 ! " f"video/x-h264,profile=baseline ! " f"rtspclientsink location=rtsp://127.0.0.1:{PORT}/live" ) # Try simple UDP multicast instead gst_out_udp = ( f"appsrc ! " f"video/x-raw,format=BGR,width={WIDTH},height={HEIGHT},framerate={FPS}/1 ! " f"videoconvert ! video/x-raw,format=I420 ! " f"x264enc tune=zerolatency bitrate=2000 speed-preset=ultrafast ! " f"h264parse ! " f"mpegtsmux ! " f"udpsink host=224.1.1.1 port=5000 auto-multicast=true" ) # Or just write to file for testing gst_file = ( f"appsrc ! " f"video/x-raw,format=BGR,width={WIDTH},height={HEIGHT},framerate={FPS}/1 ! " f"videoconvert ! video/x-raw,format=I420 ! " f"x264enc tune=zerolatency ! " f"mp4mux ! " f"filesink location=/tmp/output_detection.mp4" ) print(f"\nStarting detection stream...") print(f"Output: /tmp/output_detection.mp4") print("Press Ctrl+C to stop\n") out = cv2.VideoWriter(gst_file, cv2.CAP_GSTREAMER, 0, FPS, (WIDTH, HEIGHT), True) if not out.isOpened(): print("GStreamer writer failed, using regular file output") out = cv2.VideoWriter('/tmp/output_detection.mp4', cv2.VideoWriter_fourcc(*'mp4v'), FPS, (WIDTH, HEIGHT)) frame_count = 0 start = time.time() total_detections = 0 try: while True: ret, frame = cap.read() if not ret: cap.set(cv2.CAP_PROP_POS_FRAMES, 0) continue frame = cv2.resize(frame, (WIDTH, HEIGHT)) # Detection results = model(frame, verbose=False, classes=[BALL_CLASS_ID], conf=0.25) for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = float(box.conf[0]) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3) cv2.putText(frame, f"Ball {conf:.2f}", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) total_detections += 1 # FPS overlay frame_count += 1 elapsed = time.time() - start fps = frame_count / elapsed cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2) out.write(frame) if frame_count % 50 == 0: print(f"Frame {frame_count}, FPS: {fps:.1f}, Detections: {total_detections}") # Stop after 10 seconds for test if elapsed > 10: print("\n10 second test complete") break except KeyboardInterrupt: print("\nStopping...") finally: elapsed = time.time() - start print(f"\nProcessed {frame_count} frames in {elapsed:.1f}s") print(f"Average FPS: {frame_count/elapsed:.1f}") print(f"Total detections: {total_detections}") print(f"Output saved to: /tmp/output_detection.mp4") cap.release() out.release() return 0 if __name__ == "__main__": sys.exit(main())