harpoon-1-2 / webcam_detection_harpoonnet12.py
christiankhoury05's picture
HarpoonNet 1.2 - ConvNeXt-Small Architecture (~50M params) - Fixed Model Upload
376e782 verified
#!/usr/bin/env python3
"""
πŸ”΄ HarpoonNet 1.2 Real-Time Webcam Detection
Professional real-time drone detection using HarpoonNet 1.2
"""
import cv2
import torch
import numpy as np
import time
import sys
import os
import argparse
from pathlib import Path
# Import HarpoonNet 1.2
from harpoon_modular import create_harpoon_net_12
class HarpoonNet12WebcamDetector:
"""Real-time webcam detection using HarpoonNet 1.2"""
def __init__(self, model_path="pytorch_model.pth", confidence_threshold=0.3):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.input_size = 544
self.confidence_threshold = confidence_threshold
self.model_path = model_path
print(f"πŸ”΄ HarpoonNet 1.2 - Real-Time Webcam Detection")
print(f" πŸ—οΈ Architecture: ConvNeXt-Small + Enhanced HarpoonHead")
print(f" πŸ“ Device: {self.device}")
print(f" 🎯 Confidence: {confidence_threshold}")
print(f" πŸ“ Input Size: {self.input_size}x{self.input_size}")
print("=" * 60)
# Load the model
self.model = self._load_model()
# Initialize webcam
self._init_webcam()
# Performance tracking
self.fps_history = []
self.frame_count = 0
self.detection_history = []
print("πŸš€ HarpoonNet 1.2 ready for real-time detection!")
def _load_model(self):
"""Load HarpoonNet 1.2 model"""
print(f"πŸ“¦ Loading HarpoonNet 1.2 from: {self.model_path}")
if not Path(self.model_path).exists():
print(f"❌ Model file not found: {self.model_path}")
sys.exit(1)
# Create model architecture
model = create_harpoon_net_12(num_classes=1, num_anchors=3, pretrained=False)
try:
# Load checkpoint
checkpoint = torch.load(self.model_path, map_location='cpu')
if 'model_state_dict' in checkpoint:
state_dict = checkpoint['model_state_dict']
print(f" βœ… Loaded full checkpoint")
if 'architecture' in checkpoint:
print(f" πŸ—οΈ Architecture: {checkpoint['architecture']}")
if 'parameters' in checkpoint:
print(f" πŸ“Š Parameters: {checkpoint['parameters']:,}")
else:
state_dict = checkpoint
print(f" βœ… Loaded model weights")
# Load state dict (clean model without DataParallel prefix)
model.load_state_dict(state_dict)
model = model.to(self.device)
model.eval()
param_count = sum(p.numel() for p in model.parameters())
print(f" πŸ“ˆ Total Parameters: {param_count:,}")
print(f" 🎯 Model loaded and ready!")
return model
except Exception as e:
print(f" ❌ Error loading model: {e}")
sys.exit(1)
def _init_webcam(self):
"""Initialize webcam with optimal settings"""
print("πŸ“Ή Initializing webcam...")
self.cap = cv2.VideoCapture(0)
if not self.cap.isOpened():
print("❌ Failed to open webcam")
sys.exit(1)
# Set webcam properties for best performance
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self.cap.set(cv2.CAP_PROP_FPS, 30)
# Get actual properties
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = self.cap.get(cv2.CAP_PROP_FPS)
print(f" βœ… Camera: {width}x{height} @ {fps} FPS")
def _preprocess_frame(self, frame):
"""Preprocess frame for model input"""
# Resize to model input size
resized = cv2.resize(frame, (self.input_size, self.input_size))
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
# Normalize to [0, 1]
normalized = rgb_frame.astype(np.float32) / 255.0
# Convert to tensor and add batch dimension
tensor = torch.from_numpy(normalized).permute(2, 0, 1).unsqueeze(0)
return tensor.to(self.device)
def _get_detections(self, input_tensor):
"""Get detections from the model"""
with torch.no_grad():
predictions = self.model(input_tensor)
# Decode predictions
detections = self.model.decode_predictions(
predictions,
confidence_threshold=self.confidence_threshold
)
# Process detections
processed_detections = []
if len(detections) > 0:
detection = detections[0]
boxes = detection.get('boxes', [])
scores = detection.get('scores', [])
for i, box in enumerate(boxes):
if isinstance(box, (list, tuple)) and len(box) == 4:
processed_detections.append({
'box': box,
'score': scores[i] if i < len(scores) else 0.5,
'class': 'drone'
})
return processed_detections
def _calculate_fps(self):
"""Calculate running FPS"""
current_time = time.time()
self.fps_history.append(current_time)
# Keep only last 30 frames for FPS calculation
if len(self.fps_history) > 30:
self.fps_history.pop(0)
if len(self.fps_history) > 1:
time_span = self.fps_history[-1] - self.fps_history[0]
return (len(self.fps_history) - 1) / time_span
return 0.0
def _draw_detections(self, frame, detections):
"""Draw detections with red bounding boxes"""
h, w = frame.shape[:2]
# Color scheme
red_color = (0, 0, 255) # Red for boxes
text_color = (255, 255, 255) # White text
bg_color = (0, 0, 0) # Black background for text
# Calculate FPS
fps = self._calculate_fps()
# Draw header info
header_text = f"HarpoonNet 1.2 | FPS: {fps:.1f} | Detections: {len(detections)}"
(text_w, text_h), baseline = cv2.getTextSize(header_text, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
# Background for header
cv2.rectangle(frame, (10, 10), (10 + text_w + 10, 10 + text_h + baseline + 10), bg_color, -1)
cv2.putText(frame, header_text, (15, 10 + text_h + 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) # Cyan for header
# Draw model info
model_text = f"ConvNeXt-Small | Conf: {self.confidence_threshold:.2f}"
cv2.putText(frame, model_text, (15, 10 + text_h + 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color, 1)
# Draw bounding boxes
for detection in detections:
box = detection['box']
score = detection['score']
# Scale box coordinates to frame size
x1 = int(box[0] * w)
y1 = int(box[1] * h)
x2 = int(box[2] * w)
y2 = int(box[3] * h)
# Ensure coordinates are within frame
x1 = max(0, min(x1, w-1))
y1 = max(0, min(y1, h-1))
x2 = max(0, min(x2, w-1))
y2 = max(0, min(y2, h-1))
# Draw red bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), red_color, 3)
# Draw confidence label with background
label = f"Drone: {score:.3f}"
(label_w, label_h), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
# Position label (above box if space, below if not)
label_x = x1
label_y = y1 - 10 if y1 > 30 else y2 + label_h + 10
# Background for label
cv2.rectangle(frame, (label_x, label_y - label_h - 5),
(label_x + label_w + 10, label_y + 5), bg_color, -1)
# Label text in red to match box
cv2.putText(frame, label, (label_x + 5, label_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, red_color, 2)
# Track detection statistics
self.detection_history.append(len(detections))
if len(self.detection_history) > 100:
self.detection_history.pop(0)
# Draw performance stats
if len(self.detection_history) > 10:
avg_detections = sum(self.detection_history) / len(self.detection_history)
perf_text = f"Avg Detections: {avg_detections:.1f}"
cv2.putText(frame, perf_text, (15, h - 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color, 1)
# Draw controls
controls_text = "Q=quit | S=save | +/-=confidence | R=reset stats"
cv2.putText(frame, controls_text, (15, h - 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color, 1)
def run(self):
"""Main detection loop"""
print("\nπŸ”΄ Starting HarpoonNet 1.2 real-time detection...")
print(" Controls:")
print(" Q = Quit")
print(" S = Save current frame")
print(" + = Increase confidence")
print(" - = Decrease confidence")
print(" R = Reset performance stats")
print("\nπŸš€ Ready for real-time detection!")
try:
while True:
# Capture frame
ret, frame = self.cap.read()
if not ret:
print("❌ Failed to capture frame")
break
# Preprocess frame
input_tensor = self._preprocess_frame(frame)
# Get detections
detections = self._get_detections(input_tensor)
# Draw results
self._draw_detections(frame, detections)
# Display frame
cv2.imshow('HarpoonNet 1.2 - Real-Time Drone Detection', frame)
# Handle keyboard input
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # Q or ESC
break
elif key == ord('s'):
# Save current frame
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"harpoonnet12_detection_{timestamp}.jpg"
cv2.imwrite(filename, frame)
print(f"πŸ“Έ Saved: {filename}")
elif key == ord('+') or key == ord('='):
# Increase confidence
self.confidence_threshold = min(0.95, self.confidence_threshold + 0.05)
print(f"πŸ”Ί Confidence: {self.confidence_threshold:.2f}")
elif key == ord('-') or key == ord('_'):
# Decrease confidence
self.confidence_threshold = max(0.05, self.confidence_threshold - 0.05)
print(f"πŸ”» Confidence: {self.confidence_threshold:.2f}")
elif key == ord('r'):
# Reset stats
self.fps_history = []
self.detection_history = []
self.frame_count = 0
print("πŸ”„ Performance stats reset")
self.frame_count += 1
# Periodic status
if self.frame_count % 500 == 0:
avg_fps = self._calculate_fps()
recent_detections = sum(self.detection_history[-50:]) if len(self.detection_history) >= 50 else 0
print(f"πŸ“Š Frame {self.frame_count} | FPS: {avg_fps:.1f} | Recent detections: {recent_detections}")
except KeyboardInterrupt:
print("\nπŸ›‘ Detection stopped by user")
finally:
self._cleanup()
def _cleanup(self):
"""Clean up resources"""
print("🧹 Cleaning up...")
if hasattr(self, 'cap'):
self.cap.release()
cv2.destroyAllWindows()
# Final statistics
if len(self.fps_history) > 1:
avg_fps = self._calculate_fps()
total_detections = sum(self.detection_history)
print(f"\nπŸ“Š Final Performance:")
print(f" 🎬 Frames processed: {self.frame_count}")
print(f" ⚑ Average FPS: {avg_fps:.1f}")
print(f" 🎯 Total detections: {total_detections}")
if self.frame_count > 0:
print(f" πŸ“ˆ Detection rate: {total_detections/self.frame_count*100:.1f}%")
print("βœ… Cleanup complete - HarpoonNet 1.2 session ended")
def main():
"""Main function"""
parser = argparse.ArgumentParser(description='HarpoonNet 1.2 Real-Time Webcam Detection')
parser.add_argument('--model', default='pytorch_model.pth',
help='Path to HarpoonNet 1.2 model file')
parser.add_argument('--conf', type=float, default=0.3,
help='Confidence threshold (default: 0.3)')
args = parser.parse_args()
print("πŸ”΄ HARPOONNET 1.2 - REAL-TIME DETECTION")
print("=" * 50)
print("πŸ—οΈ Architecture: ConvNeXt-Small + Enhanced HarpoonHead")
print("πŸš€ Features: Real-time webcam detection")
print("πŸ“ Input: 544x544 resolution")
print("πŸ”΄ Style: Red bounding boxes")
print("=" * 50)
try:
# Create detector
detector = HarpoonNet12WebcamDetector(
model_path=args.model,
confidence_threshold=args.conf
)
# Run detection
detector.run()
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()