"""Asset 1: Extract frames from video""" import cv2 from pathlib import Path from typing import Dict from dagster import asset, AssetExecutionContext @asset( io_manager_key="json_io_manager", compute_kind="opencv", description="Extract frames from video starting at specified second" ) def extract_video_frames(context: AssetExecutionContext) -> Dict: """ Extract frames from DJI_0017.MP4 video Inputs: - DJI_0017.MP4 (video file in root directory) Outputs: - data/frames/frame_XXXX.jpg (100 frames) - data/extract_video_frames.json (metadata) Returns: Dict with: - frames_dir: path to frames directory - num_frames: number of extracted frames - fps: video FPS - start_frame: starting frame number """ # Configuration video_path = "DJI_0017.MP4" start_sec = 10 num_frames = 100 context.log.info(f"Opening video: {video_path}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Could not open video: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) start_frame = int(start_sec * fps) context.log.info(f"Video info: {total_frames} frames, {fps} FPS") context.log.info(f"Extracting {num_frames} frames starting from frame {start_frame} ({start_sec}s)") # Create output directory with run_id run_id = context.run_id frames_dir = Path(f"data/{run_id}/frames") frames_dir.mkdir(parents=True, exist_ok=True) # Set starting position cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) # Extract frames extracted = 0 for i in range(num_frames): ret, frame = cap.read() if not ret: context.log.warning(f"Could not read frame {i}. Stopping extraction.") break frame_path = frames_dir / f"frame_{i:04d}.jpg" cv2.imwrite(str(frame_path), frame) extracted += 1 if (i + 1) % 20 == 0: context.log.info(f"Extracted {i + 1}/{num_frames} frames") cap.release() context.log.info(f"✓ Extracted {extracted} frames to {frames_dir}") return { "frames_dir": str(frames_dir), "num_frames": extracted, "fps": fps, "start_frame": start_frame, "start_sec": start_sec }