|
""" |
|
HarpoonNet Webcam Detection with ByteTrack |
|
Real-time drone detection and tracking using webcam |
|
""" |
|
|
|
import cv2 |
|
import torch |
|
import numpy as np |
|
import argparse |
|
import time |
|
from PIL import Image |
|
from torchvision import transforms |
|
from harpoon_modular import create_harpoon_net_modular |
|
from bytetrack import BYTETracker |
|
import random |
|
|
|
|
|
class WebcamDetectorWithTracking: |
|
"""Webcam detector with ByteTrack integration""" |
|
|
|
def __init__(self, model_path, device=None): |
|
"""Initialize detector with tracking""" |
|
self.model_path = model_path |
|
self.device = device or torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
self.conf_thresh = 0.6 |
|
self.debug_view = False |
|
self.last_time = None |
|
|
|
|
|
self.tracker = BYTETracker( |
|
frame_rate=30, |
|
track_thresh=0.7, |
|
track_buffer=30, |
|
match_thresh=0.8 |
|
) |
|
|
|
|
|
self.colors = self.generate_colors(50) |
|
|
|
|
|
print(f"π Loading model from {model_path}") |
|
self.model = create_harpoon_net_modular(pretrained=False) |
|
|
|
|
|
checkpoint = torch.load(model_path, map_location=self.device) |
|
if 'model_state_dict' in checkpoint: |
|
|
|
self.model.load_state_dict(checkpoint['model_state_dict']) |
|
else: |
|
|
|
self.model.load_state_dict(checkpoint) |
|
|
|
self.model = self.model.to(self.device) |
|
self.model.eval() |
|
print("β
Model loaded successfully") |
|
|
|
|
|
self.cap = cv2.VideoCapture(0) |
|
if not self.cap.isOpened(): |
|
raise RuntimeError("β Could not open webcam") |
|
|
|
|
|
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
print(f"πΉ Camera resolution: {self.frame_width}x{self.frame_height}") |
|
print(f"π― Initial confidence threshold: {self.conf_thresh:.2f}") |
|
|
|
def generate_colors(self, num_colors): |
|
"""Generate random colors for track visualization""" |
|
colors = [] |
|
for _ in range(num_colors): |
|
color = ( |
|
random.randint(0, 255), |
|
random.randint(0, 255), |
|
random.randint(0, 255) |
|
) |
|
colors.append(color) |
|
return colors |
|
|
|
def preprocess_frame(self, frame): |
|
"""Preprocess frame for model input""" |
|
|
|
orig_h, orig_w = frame.shape[:2] |
|
|
|
|
|
input_size = 320 |
|
scale = min(input_size / orig_w, input_size / orig_h) |
|
new_w = int(orig_w * scale) |
|
new_h = int(orig_h * scale) |
|
|
|
|
|
resized = cv2.resize(frame, (new_w, new_h)) |
|
|
|
|
|
square = np.zeros((input_size, input_size, 3), dtype=np.uint8) |
|
|
|
x_offset = (input_size - new_w) // 2 |
|
y_offset = (input_size - new_h) // 2 |
|
square[y_offset:y_offset+new_h, x_offset:x_offset+new_w] = resized |
|
|
|
|
|
img = cv2.cvtColor(square, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
img = Image.fromarray(img) |
|
|
|
|
|
transform = transforms.Compose([ |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
|
]) |
|
|
|
|
|
img = transform(img).unsqueeze(0) |
|
|
|
return img, (scale, x_offset, y_offset) |
|
|
|
def postprocess_detections(self, detections, preprocess_info, frame_shape): |
|
"""Convert normalized coordinates back to original frame coordinates""" |
|
scale, x_offset, y_offset = preprocess_info |
|
orig_h, orig_w = frame_shape[:2] |
|
processed_detections = [] |
|
|
|
for i, det in enumerate(detections[0]['boxes']): |
|
if len(det) == 4 and i < len(detections[0]['scores']): |
|
score = detections[0]['scores'][i] |
|
|
|
|
|
if score < self.conf_thresh: |
|
continue |
|
|
|
|
|
x1 = (det[0] - x_offset) / scale |
|
y1 = (det[1] - y_offset) / scale |
|
x2 = (det[2] - x_offset) / scale |
|
y2 = (det[3] - y_offset) / scale |
|
|
|
|
|
x1 = np.clip(x1, 0, orig_w) |
|
y1 = np.clip(y1, 0, orig_h) |
|
x2 = np.clip(x2, 0, orig_w) |
|
y2 = np.clip(y2, 0, orig_h) |
|
|
|
|
|
w = x2 - x1 |
|
h = y2 - y1 |
|
if w > 10 and h > 10 and w < orig_w * 0.9 and h < orig_h * 0.9: |
|
processed_detections.append([x1, y1, x2, y2, score]) |
|
|
|
return processed_detections |
|
|
|
def process_frame(self, frame): |
|
"""Process a single frame with detection and tracking""" |
|
|
|
img, preprocess_info = self.preprocess_frame(frame) |
|
img = img.to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
predictions = self.model(img) |
|
detections = self.model.decode_predictions(predictions, confidence_threshold=0.1) |
|
|
|
|
|
detection_list = self.postprocess_detections(detections, preprocess_info, frame.shape) |
|
|
|
|
|
tracks = self.tracker.update(detection_list) |
|
|
|
|
|
frame_with_tracks = self.draw_tracks(frame, tracks) |
|
|
|
|
|
fps = 1.0 / (time.time() - self.last_time) if self.last_time else 0.0 |
|
self.last_time = time.time() |
|
|
|
cv2.putText(frame_with_tracks, f"FPS: {fps:.1f} | Tracks: {len(tracks)} | Dets: {len(detection_list)} | Conf: {self.conf_thresh:.2f}", |
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) |
|
|
|
return frame_with_tracks |
|
|
|
def draw_tracks(self, frame, tracks): |
|
"""Draw tracking results on frame""" |
|
frame_with_tracks = frame.copy() |
|
|
|
for track in tracks: |
|
if not track.is_activated: |
|
continue |
|
|
|
|
|
track_id = track.track_id |
|
bbox = track.tlbr |
|
score = track.score |
|
|
|
|
|
color = self.colors[track_id % len(self.colors)] |
|
|
|
|
|
x1, y1, x2, y2 = map(int, bbox) |
|
cv2.rectangle(frame_with_tracks, (x1, y1), (x2, y2), color, 2) |
|
|
|
|
|
label = f"ID:{track_id} ({score:.2f})" |
|
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] |
|
|
|
|
|
cv2.rectangle(frame_with_tracks, |
|
(x1, y1 - label_size[1] - 10), |
|
(x1 + label_size[0], y1), |
|
color, -1) |
|
|
|
|
|
cv2.putText(frame_with_tracks, label, |
|
(x1, y1 - 5), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) |
|
|
|
|
|
if hasattr(track, 'history') and len(track.history) > 1: |
|
points = [] |
|
for hist_bbox in list(track.history.values())[-10:]: |
|
center_x = int((hist_bbox[0] + hist_bbox[2]) / 2) |
|
center_y = int((hist_bbox[1] + hist_bbox[3]) / 2) |
|
points.append((center_x, center_y)) |
|
|
|
|
|
for i in range(1, len(points)): |
|
cv2.line(frame_with_tracks, points[i-1], points[i], color, 2) |
|
|
|
|
|
center_x = int((x1 + x2) / 2) |
|
center_y = int((y1 + y2) / 2) |
|
cv2.circle(frame_with_tracks, (center_x, center_y), 3, color, -1) |
|
|
|
return frame_with_tracks |
|
|
|
def run(self): |
|
"""Run webcam detection with tracking""" |
|
print("π₯ Starting detection with tracking...") |
|
print("Controls:") |
|
print(" 'q': Quit") |
|
print(" 'd': Toggle debug view") |
|
print(" '+': Increase confidence threshold") |
|
print(" '-': Decrease confidence threshold") |
|
print(" 'r': Reset tracker") |
|
|
|
self.last_time = time.time() |
|
|
|
while True: |
|
ret, frame = self.cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
frame_with_tracks = self.process_frame(frame) |
|
|
|
|
|
cv2.imshow('HarpoonNet Detection + Tracking', frame_with_tracks) |
|
|
|
|
|
key = cv2.waitKey(1) & 0xFF |
|
if key == ord('q'): |
|
break |
|
elif key == ord('d'): |
|
self.debug_view = not self.debug_view |
|
elif key == ord('+'): |
|
self.conf_thresh = min(1.0, self.conf_thresh + 0.05) |
|
print(f"Confidence threshold: {self.conf_thresh:.2f}") |
|
elif key == ord('-'): |
|
self.conf_thresh = max(0.05, self.conf_thresh - 0.05) |
|
print(f"Confidence threshold: {self.conf_thresh:.2f}") |
|
elif key == ord('r'): |
|
|
|
self.tracker = BYTETracker( |
|
frame_rate=30, |
|
track_thresh=0.7, |
|
track_buffer=30, |
|
match_thresh=0.8 |
|
) |
|
print("π Tracker reset") |
|
|
|
self.cap.release() |
|
cv2.destroyAllWindows() |
|
|
|
|
|
def main(): |
|
"""Main function""" |
|
parser = argparse.ArgumentParser(description='HarpoonNet Webcam Detection with Tracking') |
|
parser.add_argument('--model', type=str, required=True, help='Path to model file') |
|
parser.add_argument('--device', type=str, default=None, help='Device to use (cuda/cpu)') |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.device: |
|
device = torch.device(args.device) |
|
else: |
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
|
print(f"π Using device: {device}") |
|
|
|
try: |
|
|
|
detector = WebcamDetectorWithTracking(args.model, device) |
|
|
|
|
|
detector.run() |
|
|
|
except KeyboardInterrupt: |
|
print("\nβΉοΈ Detection stopped by user") |
|
except Exception as e: |
|
print(f"β Error: {e}") |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |