""" HarpoonNet Webcam Detection with ByteTrack Real-time drone detection and tracking using webcam """ import cv2 import torch import numpy as np import argparse import time from PIL import Image from torchvision import transforms from harpoon_modular import create_harpoon_net_modular from bytetrack import BYTETracker import random class WebcamDetectorWithTracking: """Webcam detector with ByteTrack integration""" def __init__(self, model_path, device=None): """Initialize detector with tracking""" self.model_path = model_path self.device = device or torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.conf_thresh = 0.6 # Increased from 0.35 to be less sensitive self.debug_view = False self.last_time = None # Initialize ByteTracker self.tracker = BYTETracker( frame_rate=30, track_thresh=0.7, # Increased from 0.5 - higher threshold for new tracks track_buffer=30, match_thresh=0.8 ) # Colors for different track IDs self.colors = self.generate_colors(50) # Load model print(f"šŸ”„ Loading model from {model_path}") self.model = create_harpoon_net_modular(pretrained=False) # Load checkpoint checkpoint = torch.load(model_path, map_location=self.device) if 'model_state_dict' in checkpoint: # Full checkpoint file self.model.load_state_dict(checkpoint['model_state_dict']) else: # Just model weights self.model.load_state_dict(checkpoint) self.model = self.model.to(self.device) self.model.eval() print("āœ… Model loaded successfully") # Setup webcam self.cap = cv2.VideoCapture(0) if not self.cap.isOpened(): raise RuntimeError("āŒ Could not open webcam") # Get webcam resolution self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"šŸ“¹ Camera resolution: {self.frame_width}x{self.frame_height}") print(f"šŸŽÆ Initial confidence threshold: {self.conf_thresh:.2f}") def generate_colors(self, num_colors): """Generate random colors for track visualization""" colors = [] for _ in range(num_colors): color = ( random.randint(0, 255), random.randint(0, 255), random.randint(0, 255) ) colors.append(color) return colors def preprocess_frame(self, frame): """Preprocess frame for model input""" # Get original dimensions orig_h, orig_w = frame.shape[:2] # Calculate scaling to maintain aspect ratio input_size = 320 scale = min(input_size / orig_w, input_size / orig_h) new_w = int(orig_w * scale) new_h = int(orig_h * scale) # Resize maintaining aspect ratio resized = cv2.resize(frame, (new_w, new_h)) # Create square canvas with padding square = np.zeros((input_size, input_size, 3), dtype=np.uint8) # Center the resized image in the square x_offset = (input_size - new_w) // 2 y_offset = (input_size - new_h) // 2 square[y_offset:y_offset+new_h, x_offset:x_offset+new_w] = resized # Convert to RGB img = cv2.cvtColor(square, cv2.COLOR_BGR2RGB) # Convert to PIL Image for transforms img = Image.fromarray(img) # Apply transforms transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # Apply transform and add batch dimension img = transform(img).unsqueeze(0) return img, (scale, x_offset, y_offset) def postprocess_detections(self, detections, preprocess_info, frame_shape): """Convert normalized coordinates back to original frame coordinates""" scale, x_offset, y_offset = preprocess_info orig_h, orig_w = frame_shape[:2] processed_detections = [] for i, det in enumerate(detections[0]['boxes']): if len(det) == 4 and i < len(detections[0]['scores']): score = detections[0]['scores'][i] # Apply confidence threshold here if score < self.conf_thresh: continue # Remove padding offset x1 = (det[0] - x_offset) / scale y1 = (det[1] - y_offset) / scale x2 = (det[2] - x_offset) / scale y2 = (det[3] - y_offset) / scale # Clip to frame boundaries x1 = np.clip(x1, 0, orig_w) y1 = np.clip(y1, 0, orig_h) x2 = np.clip(x2, 0, orig_w) y2 = np.clip(y2, 0, orig_h) # Only add if box has reasonable size w = x2 - x1 h = y2 - y1 if w > 10 and h > 10 and w < orig_w * 0.9 and h < orig_h * 0.9: processed_detections.append([x1, y1, x2, y2, score]) return processed_detections def process_frame(self, frame): """Process a single frame with detection and tracking""" # Preprocess img, preprocess_info = self.preprocess_frame(frame) img = img.to(self.device) # Run inference with torch.no_grad(): predictions = self.model(img) detections = self.model.decode_predictions(predictions, confidence_threshold=0.1) # Keep low for raw detections # Postprocess detections (applies our adjustable confidence threshold) detection_list = self.postprocess_detections(detections, preprocess_info, frame.shape) # Update tracker tracks = self.tracker.update(detection_list) # Draw tracks frame_with_tracks = self.draw_tracks(frame, tracks) # Add FPS and track count fps = 1.0 / (time.time() - self.last_time) if self.last_time else 0.0 self.last_time = time.time() cv2.putText(frame_with_tracks, f"FPS: {fps:.1f} | Tracks: {len(tracks)} | Dets: {len(detection_list)} | Conf: {self.conf_thresh:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) return frame_with_tracks def draw_tracks(self, frame, tracks): """Draw tracking results on frame""" frame_with_tracks = frame.copy() for track in tracks: if not track.is_activated: continue # Get track info track_id = track.track_id bbox = track.tlbr score = track.score # Get color for this track color = self.colors[track_id % len(self.colors)] # Draw bounding box x1, y1, x2, y2 = map(int, bbox) cv2.rectangle(frame_with_tracks, (x1, y1), (x2, y2), color, 2) # Draw track ID and confidence label = f"ID:{track_id} ({score:.2f})" label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] # Draw label background cv2.rectangle(frame_with_tracks, (x1, y1 - label_size[1] - 10), (x1 + label_size[0], y1), color, -1) # Draw label text cv2.putText(frame_with_tracks, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Draw track history (trail) if hasattr(track, 'history') and len(track.history) > 1: points = [] for hist_bbox in list(track.history.values())[-10:]: # Last 10 positions center_x = int((hist_bbox[0] + hist_bbox[2]) / 2) center_y = int((hist_bbox[1] + hist_bbox[3]) / 2) points.append((center_x, center_y)) # Draw trail for i in range(1, len(points)): cv2.line(frame_with_tracks, points[i-1], points[i], color, 2) # Draw center point center_x = int((x1 + x2) / 2) center_y = int((y1 + y2) / 2) cv2.circle(frame_with_tracks, (center_x, center_y), 3, color, -1) return frame_with_tracks def run(self): """Run webcam detection with tracking""" print("šŸŽ„ Starting detection with tracking...") print("Controls:") print(" 'q': Quit") print(" 'd': Toggle debug view") print(" '+': Increase confidence threshold") print(" '-': Decrease confidence threshold") print(" 'r': Reset tracker") self.last_time = time.time() while True: ret, frame = self.cap.read() if not ret: break # Process frame frame_with_tracks = self.process_frame(frame) # Show frame cv2.imshow('HarpoonNet Detection + Tracking', frame_with_tracks) # Handle key presses key = cv2.waitKey(1) & 0xFF if key == ord('q'): break elif key == ord('d'): self.debug_view = not self.debug_view elif key == ord('+'): self.conf_thresh = min(1.0, self.conf_thresh + 0.05) print(f"Confidence threshold: {self.conf_thresh:.2f}") elif key == ord('-'): self.conf_thresh = max(0.05, self.conf_thresh - 0.05) print(f"Confidence threshold: {self.conf_thresh:.2f}") elif key == ord('r'): # Reset tracker self.tracker = BYTETracker( frame_rate=30, track_thresh=0.7, # Increased from 0.5 - higher threshold for new tracks track_buffer=30, match_thresh=0.8 ) print("šŸ”„ Tracker reset") self.cap.release() cv2.destroyAllWindows() def main(): """Main function""" parser = argparse.ArgumentParser(description='HarpoonNet Webcam Detection with Tracking') parser.add_argument('--model', type=str, required=True, help='Path to model file') parser.add_argument('--device', type=str, default=None, help='Device to use (cuda/cpu)') args = parser.parse_args() # Set device if args.device: device = torch.device(args.device) else: device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"šŸš€ Using device: {device}") try: # Create detector detector = WebcamDetectorWithTracking(args.model, device) # Run detection detector.run() except KeyboardInterrupt: print("\nā¹ļø Detection stopped by user") except Exception as e: print(f"āŒ Error: {e}") if __name__ == '__main__': main()