|
|
|
""" |
|
RF-DETR SoccerNet Inference - Professional Hugging Face Integration |
|
|
|
A state-of-the-art RF-DETR-Large model fine-tuned on the SoccerNet-Tracking dataset |
|
for detecting objects in soccer videos. Returns detections as pandas DataFrame. |
|
|
|
Classes: ball, player, referee, goalkeeper |
|
Performance: 85.7% mAP@50, 49.8% mAP |
|
""" |
|
|
|
import cv2 |
|
import pandas as pd |
|
import numpy as np |
|
import torch |
|
from rfdetr import RFDETRBase |
|
from PIL import Image |
|
from typing import Union, Optional, List, Dict, Tuple |
|
import os |
|
from tqdm import tqdm |
|
import time |
|
import json |
|
from pathlib import Path |
|
import warnings |
|
|
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
class RFDETRSoccerNet: |
|
""" |
|
RF-DETR model trained on SoccerNet dataset for soccer video analysis. |
|
Returns detections as pandas DataFrame with comprehensive metadata. |
|
|
|
Performance: |
|
- mAP@50: 85.7% |
|
- mAP: 49.8% |
|
- Classes: ball, player, referee, goalkeeper |
|
- Training: 42,750 images, NVIDIA A100 40GB, ~14 hours |
|
""" |
|
|
|
def __init__(self, model_path: str = "weights/checkpoint_best_regular.pth", device: str = "auto"): |
|
""" |
|
Initialize the RF-DETR SoccerNet model. |
|
|
|
Args: |
|
model_path: Path to the model checkpoint (default: "weights/checkpoint_best_regular.pth") |
|
device: Device to use ("cuda", "cpu", or "auto" for automatic selection) |
|
""" |
|
|
|
if device == "auto": |
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
else: |
|
self.device = device |
|
|
|
print(f"π Initializing RF-DETR SoccerNet on {self.device.upper()}") |
|
|
|
|
|
self.class_names = ['ball', 'player', 'referee', 'goalkeeper'] |
|
self.num_classes = len(self.class_names) |
|
|
|
|
|
self.model_info = { |
|
"architecture": "RF-DETR-Large", |
|
"parameters": "128M", |
|
"input_size": [1280, 1280], |
|
"performance": { |
|
"mAP@50": 0.857, |
|
"mAP": 0.498, |
|
"mAP@75": 0.520 |
|
} |
|
} |
|
|
|
|
|
self.model_path = Path(model_path) |
|
self.model = None |
|
self._load_model() |
|
|
|
print("β
RF-DETR SoccerNet ready for inference!") |
|
|
|
def _load_model(self): |
|
"""Load the RF-DETR model with trained checkpoint.""" |
|
try: |
|
print(f"π¦ Loading model from {self.model_path}...") |
|
|
|
|
|
self.model = RFDETRBase() |
|
|
|
|
|
print(f"π§ Reinitializing detection head for {self.num_classes} classes...") |
|
self.model.model.model.reinitialize_detection_head(self.num_classes) |
|
|
|
|
|
if self.model_path.exists(): |
|
checkpoint = torch.load(str(self.model_path), map_location=self.device, weights_only=False) |
|
|
|
|
|
if 'model' in checkpoint: |
|
model_state = checkpoint['model'] |
|
elif 'model_state_dict' in checkpoint: |
|
model_state = checkpoint['model_state_dict'] |
|
else: |
|
model_state = checkpoint |
|
|
|
|
|
self.model.model.model.load_state_dict(model_state) |
|
|
|
|
|
if 'best_mAP' in checkpoint: |
|
print(f"π Model mAP: {checkpoint['best_mAP']:.3f}") |
|
if 'epoch' in checkpoint: |
|
print(f"π Trained epochs: {checkpoint['epoch']}") |
|
|
|
else: |
|
raise FileNotFoundError(f"Checkpoint not found: {self.model_path}") |
|
|
|
|
|
self.model.model.model.to(self.device) |
|
self.model.model.model.eval() |
|
|
|
print(f"β
Model loaded successfully!") |
|
|
|
except Exception as e: |
|
print(f"β Error loading model: {e}") |
|
raise |
|
|
|
def process_video(self, |
|
video_path: str, |
|
confidence_threshold: float = 0.5, |
|
frame_skip: int = 1, |
|
max_frames: Optional[int] = None, |
|
save_results: bool = False, |
|
output_dir: Optional[str] = None) -> pd.DataFrame: |
|
""" |
|
Process a video and return detections as DataFrame. |
|
|
|
Args: |
|
video_path: Path to input video |
|
confidence_threshold: Minimum confidence for detections (0.0-1.0) |
|
frame_skip: Process every N frames (1 = all frames) |
|
max_frames: Maximum frames to process (None = all) |
|
save_results: Whether to save results to file |
|
output_dir: Directory to save results (optional) |
|
|
|
Returns: |
|
DataFrame with columns: frame, timestamp, class_name, x1, y1, x2, y2, width, height, confidence |
|
""" |
|
print(f"π¬ Processing video: {video_path}") |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
raise ValueError(f"Could not open video: {video_path}") |
|
|
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
print(f"πΉ Video info: {total_frames} frames, {fps:.2f} FPS, {width}x{height}") |
|
|
|
|
|
results = [] |
|
frame_count = 0 |
|
processed_count = 0 |
|
start_time = time.time() |
|
|
|
frames_to_process = min(total_frames, max_frames) if max_frames else total_frames |
|
pbar = tqdm(total=frames_to_process, desc="Processing frames", unit="frame") |
|
|
|
while cap.isOpened() and frame_count < frames_to_process: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
if frame_count % frame_skip == 0: |
|
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
pil_image = Image.fromarray(frame_rgb) |
|
|
|
|
|
with torch.no_grad(): |
|
detections = self.model.predict(pil_image, threshold=confidence_threshold) |
|
|
|
|
|
if detections is not None and len(detections) > 0: |
|
for i in range(len(detections)): |
|
try: |
|
class_id = int(detections.class_id[i]) |
|
if 0 <= class_id < len(self.class_names): |
|
x1, y1, x2, y2 = detections.xyxy[i].tolist() |
|
|
|
results.append({ |
|
'frame': frame_count, |
|
'timestamp': frame_count / fps, |
|
'class_name': self.class_names[class_id], |
|
'class_id': class_id, |
|
'x1': float(x1), |
|
'y1': float(y1), |
|
'x2': float(x2), |
|
'y2': float(y2), |
|
'width': float(x2 - x1), |
|
'height': float(y2 - y1), |
|
'confidence': float(detections.confidence[i]), |
|
'center_x': float((x1 + x2) / 2), |
|
'center_y': float((y1 + y2) / 2), |
|
'area': float((x2 - x1) * (y2 - y1)) |
|
}) |
|
except Exception as e: |
|
print(f"β οΈ Error processing detection {i}: {e}") |
|
continue |
|
|
|
processed_count += 1 |
|
|
|
frame_count += 1 |
|
pbar.update(1) |
|
|
|
cap.release() |
|
pbar.close() |
|
|
|
|
|
df = pd.DataFrame(results) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
fps_processed = processed_count / processing_time if processing_time > 0 else 0 |
|
|
|
print(f"\nβ
Processing complete!") |
|
print(f"π Stats:") |
|
print(f" - Total frames: {frame_count:,}") |
|
print(f" - Frames processed: {processed_count:,}") |
|
print(f" - Processing time: {processing_time:.1f}s") |
|
print(f" - Processing speed: {fps_processed:.1f} FPS") |
|
print(f" - Total detections: {len(df):,}") |
|
|
|
if len(df) > 0: |
|
print(f"\nπ― Detections by class:") |
|
class_counts = df['class_name'].value_counts() |
|
for class_name, count in class_counts.items(): |
|
percentage = (count / len(df)) * 100 |
|
print(f" - {class_name}: {count:,} ({percentage:.1f}%)") |
|
|
|
|
|
if save_results: |
|
self._save_video_results(df, video_path, output_dir, { |
|
'total_frames': frame_count, |
|
'processed_frames': processed_count, |
|
'processing_time': processing_time, |
|
'fps_processed': fps_processed, |
|
'video_fps': fps, |
|
'video_resolution': f"{width}x{height}" |
|
}) |
|
|
|
return df |
|
|
|
def process_image(self, |
|
image_path: str, |
|
confidence_threshold: float = 0.5) -> pd.DataFrame: |
|
""" |
|
Process a single image and return detections as DataFrame. |
|
|
|
Args: |
|
image_path: Path to input image |
|
confidence_threshold: Minimum confidence for detections |
|
|
|
Returns: |
|
DataFrame with columns: class_name, x1, y1, x2, y2, width, height, confidence |
|
""" |
|
print(f"πΌοΈ Processing image: {image_path}") |
|
|
|
|
|
if not os.path.exists(image_path): |
|
raise FileNotFoundError(f"Image not found: {image_path}") |
|
|
|
image = cv2.imread(image_path) |
|
if image is None: |
|
raise ValueError(f"Could not load image: {image_path}") |
|
|
|
|
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
pil_image = Image.fromarray(image_rgb) |
|
|
|
|
|
start_time = time.time() |
|
with torch.no_grad(): |
|
detections = self.model.predict(pil_image, threshold=confidence_threshold) |
|
inference_time = time.time() - start_time |
|
|
|
|
|
results = [] |
|
if detections is not None and len(detections) > 0: |
|
for i in range(len(detections)): |
|
try: |
|
class_id = int(detections.class_id[i]) |
|
if 0 <= class_id < len(self.class_names): |
|
x1, y1, x2, y2 = detections.xyxy[i].tolist() |
|
|
|
results.append({ |
|
'class_name': self.class_names[class_id], |
|
'class_id': class_id, |
|
'x1': float(x1), |
|
'y1': float(y1), |
|
'x2': float(x2), |
|
'y2': float(y2), |
|
'width': float(x2 - x1), |
|
'height': float(y2 - y1), |
|
'confidence': float(detections.confidence[i]), |
|
'center_x': float((x1 + x2) / 2), |
|
'center_y': float((y1 + y2) / 2), |
|
'area': float((x2 - x1) * (y2 - y1)) |
|
}) |
|
except Exception as e: |
|
print(f"β οΈ Error processing detection {i}: {e}") |
|
continue |
|
|
|
df = pd.DataFrame(results) |
|
|
|
print(f"β
Found {len(df)} detections in {inference_time:.3f}s") |
|
if len(df) > 0: |
|
print("π― Detections:") |
|
for class_name, count in df['class_name'].value_counts().items(): |
|
print(f" - {class_name}: {count}") |
|
|
|
return df |
|
|
|
def save_results(self, df: pd.DataFrame, output_path: str, format: str = 'csv', include_metadata: bool = True): |
|
""" |
|
Save DataFrame results to file with optional metadata. |
|
|
|
Args: |
|
df: DataFrame with detections |
|
output_path: Output file path |
|
format: 'csv', 'json', or 'parquet' |
|
include_metadata: Whether to include model metadata |
|
""" |
|
output_path = Path(output_path) |
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
if format.lower() == 'csv': |
|
df.to_csv(output_path, index=False) |
|
elif format.lower() == 'json': |
|
result_data = { |
|
'detections': df.to_dict('records'), |
|
'summary': { |
|
'total_detections': len(df), |
|
'classes': df['class_name'].value_counts().to_dict() if len(df) > 0 else {} |
|
} |
|
} |
|
if include_metadata: |
|
result_data['model_info'] = self.model_info |
|
|
|
with open(output_path, 'w') as f: |
|
json.dump(result_data, f, indent=2) |
|
elif format.lower() == 'parquet': |
|
df.to_parquet(output_path, index=False) |
|
else: |
|
raise ValueError(f"Unsupported format: {format}. Use 'csv', 'json', or 'parquet'") |
|
|
|
print(f"πΎ Results saved to {output_path}") |
|
|
|
def _save_video_results(self, df: pd.DataFrame, video_path: str, output_dir: Optional[str], stats: Dict): |
|
"""Save video processing results with comprehensive metadata.""" |
|
if output_dir is None: |
|
output_dir = os.path.dirname(video_path) |
|
|
|
video_name = Path(video_path).stem |
|
|
|
|
|
csv_path = Path(output_dir) / f"{video_name}_detections.csv" |
|
json_path = Path(output_dir) / f"{video_name}_analysis.json" |
|
|
|
|
|
df.to_csv(csv_path, index=False) |
|
|
|
|
|
analysis = { |
|
'video_info': { |
|
'path': video_path, |
|
'name': video_name, |
|
**stats |
|
}, |
|
'detection_summary': { |
|
'total_detections': len(df), |
|
'classes': df['class_name'].value_counts().to_dict() if len(df) > 0 else {}, |
|
'confidence_stats': { |
|
'mean': float(df['confidence'].mean()) if len(df) > 0 else 0, |
|
'median': float(df['confidence'].median()) if len(df) > 0 else 0, |
|
'min': float(df['confidence'].min()) if len(df) > 0 else 0, |
|
'max': float(df['confidence'].max()) if len(df) > 0 else 0 |
|
} |
|
}, |
|
'model_info': self.model_info, |
|
'detections': df.to_dict('records') |
|
} |
|
|
|
with open(json_path, 'w') as f: |
|
json.dump(analysis, f, indent=2) |
|
|
|
print(f"π Analysis saved:") |
|
print(f" - CSV: {csv_path}") |
|
print(f" - JSON: {json_path}") |
|
|
|
def analyze_ball_possession(self, df: pd.DataFrame, distance_threshold: float = 100) -> pd.DataFrame: |
|
""" |
|
Analyze which players are near the ball (ball possession analysis). |
|
|
|
Args: |
|
df: DataFrame from process_video() |
|
distance_threshold: Maximum distance to consider "near ball" (pixels) |
|
|
|
Returns: |
|
DataFrame with ball possession events |
|
""" |
|
print(f"β½ Analyzing ball possession (threshold: {distance_threshold}px)") |
|
|
|
ball_df = df[df['class_name'] == 'ball'].copy() |
|
player_df = df[df['class_name'] == 'player'].copy() |
|
|
|
possession_events = [] |
|
|
|
for frame in ball_df['frame'].unique(): |
|
ball_in_frame = ball_df[ball_df['frame'] == frame] |
|
players_in_frame = player_df[player_df['frame'] == frame] |
|
|
|
if len(ball_in_frame) > 0 and len(players_in_frame) > 0: |
|
ball_center = ball_in_frame.iloc[0] |
|
|
|
for _, player in players_in_frame.iterrows(): |
|
distance = np.sqrt( |
|
(ball_center['center_x'] - player['center_x'])**2 + |
|
(ball_center['center_y'] - player['center_y'])**2 |
|
) |
|
|
|
if distance <= distance_threshold: |
|
possession_events.append({ |
|
'frame': frame, |
|
'timestamp': player['timestamp'], |
|
'player_x': player['center_x'], |
|
'player_y': player['center_y'], |
|
'ball_x': ball_center['center_x'], |
|
'ball_y': ball_center['center_y'], |
|
'distance_to_ball': float(distance), |
|
'ball_confidence': ball_center['confidence'], |
|
'player_confidence': player['confidence'] |
|
}) |
|
|
|
possession_df = pd.DataFrame(possession_events) |
|
|
|
if len(possession_df) > 0: |
|
print(f"β
Found {len(possession_df)} possession events") |
|
print(f"π― Average distance to ball: {possession_df['distance_to_ball'].mean():.1f}px") |
|
else: |
|
print("β No possession events found") |
|
|
|
return possession_df |
|
|
|
def get_model_info(self) -> Dict: |
|
"""Get comprehensive model information.""" |
|
return { |
|
**self.model_info, |
|
'classes': self.class_names, |
|
'device': self.device, |
|
'checkpoint_path': str(self.model_path) |
|
} |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print("π RF-DETR SoccerNet Demo") |
|
model = RFDETRSoccerNet() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("β
Demo complete! Replace with your video path to test.") |