91 lines
2.4 KiB
Python
91 lines
2.4 KiB
Python
"""Asset: Draw court polygon with 4 corners"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import Dict
|
|
from dagster import asset, AssetExecutionContext
|
|
|
|
|
|
@asset(
|
|
io_manager_key="json_io_manager",
|
|
compute_kind="opencv",
|
|
description="Draw court polygon with 4 corners on first frame"
|
|
)
|
|
def visualize_ball_on_court(
|
|
context: AssetExecutionContext,
|
|
extract_video_frames: Dict,
|
|
detect_court_keypoints: Dict
|
|
) -> Dict:
|
|
"""
|
|
Draw court polygon (4 corners) on first frame
|
|
|
|
Inputs:
|
|
- extract_video_frames: frame metadata
|
|
- detect_court_keypoints: 4 court corners with perspective
|
|
|
|
Outputs:
|
|
- One image: data/{run_id}/court_polygon.jpg
|
|
|
|
Returns:
|
|
Dict with:
|
|
- image_path: path to saved image
|
|
"""
|
|
run_id = context.run_id
|
|
frames_dir = Path(extract_video_frames['frames_dir'])
|
|
|
|
# Load first frame
|
|
first_frame_path = frames_dir / "frame_0000.jpg"
|
|
context.log.info(f"Loading first frame: {first_frame_path}")
|
|
|
|
if not first_frame_path.exists():
|
|
raise FileNotFoundError(f"First frame not found: {first_frame_path}")
|
|
|
|
frame = cv2.imread(str(first_frame_path))
|
|
if frame is None:
|
|
raise RuntimeError(f"Failed to load frame: {first_frame_path}")
|
|
|
|
# Get court corners (4 points with perspective)
|
|
corners = detect_court_keypoints['corners_pixel']
|
|
court_polygon = np.array(corners, dtype=np.int32)
|
|
|
|
context.log.info(f"Drawing court polygon with 4 corners: {corners}")
|
|
|
|
# Draw court polygon (4 corners with perspective)
|
|
cv2.polylines(
|
|
frame,
|
|
[court_polygon],
|
|
isClosed=True,
|
|
color=(0, 255, 0), # Green
|
|
thickness=3
|
|
)
|
|
|
|
# Draw court corners as circles
|
|
for i, corner in enumerate(corners):
|
|
cv2.circle(
|
|
frame,
|
|
(int(corner[0]), int(corner[1])),
|
|
8,
|
|
(0, 255, 255), # Yellow
|
|
-1
|
|
)
|
|
# Label corners
|
|
cv2.putText(
|
|
frame,
|
|
str(i),
|
|
(int(corner[0]) + 12, int(corner[1])),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
0.6,
|
|
(0, 255, 255),
|
|
2
|
|
)
|
|
|
|
# Save image
|
|
output_path = Path(f"data/{run_id}/court_polygon.jpg")
|
|
cv2.imwrite(str(output_path), frame)
|
|
context.log.info(f"✓ Saved court polygon visualization to {output_path}")
|
|
|
|
return {
|
|
"image_path": str(output_path)
|
|
}
|