|
|
|
""" |
|
HarpoonNet Webcam Detection Script |
|
Real-time drone detection using webcam feed |
|
""" |
|
|
|
import cv2 |
|
import torch |
|
import numpy as np |
|
import argparse |
|
from pathlib import Path |
|
from harpoon_modular import HarpoonNetModular |
|
from PIL import Image |
|
from torchvision import transforms |
|
import time |
|
|
|
class WebcamDetector: |
|
def __init__(self, model_path, conf_thresh=0.6, nms_thresh=0.4, camera_id=0, flip_frame=True): |
|
"""Initialize the webcam detector""" |
|
self.model_path = model_path |
|
self.conf_thresh = conf_thresh |
|
self.nms_thresh = nms_thresh |
|
self.flip_frame = flip_frame |
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
self.debug_view = False |
|
self.last_time = None |
|
|
|
|
|
self.normalize_mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) |
|
self.normalize_std = np.array([0.229, 0.224, 0.225], dtype=np.float32) |
|
|
|
|
|
print(f"π Loading model from {model_path}") |
|
checkpoint = torch.load(model_path, map_location=self.device) |
|
self.model = HarpoonNetModular(num_classes=1, num_anchors=3) |
|
self.model.load_state_dict(checkpoint['model_state_dict']) |
|
self.model.to(self.device) |
|
self.model.eval() |
|
print("β
Model loaded successfully") |
|
|
|
|
|
self.cap = cv2.VideoCapture(camera_id) |
|
if not self.cap.isOpened(): |
|
raise RuntimeError(f"Failed to open camera {camera_id}") |
|
|
|
|
|
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
print(f"πΉ Camera resolution: {self.frame_width}x{self.frame_height}") |
|
print(f"π― Initial confidence threshold: {self.conf_thresh:.2f}") |
|
|
|
def enhance_frame(self, frame): |
|
"""Enhance frame for better detection""" |
|
|
|
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) |
|
l, a, b = cv2.split(lab) |
|
|
|
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) |
|
cl = clahe.apply(l) |
|
|
|
|
|
limg = cv2.merge((cl,a,b)) |
|
|
|
|
|
enhanced = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR) |
|
|
|
|
|
alpha = 1.3 |
|
beta = 10 |
|
enhanced = cv2.convertScaleAbs(enhanced, alpha=alpha, beta=beta) |
|
|
|
return enhanced |
|
|
|
def preprocess_frame(self, frame): |
|
"""Preprocess frame for model input""" |
|
|
|
orig_h, orig_w = frame.shape[:2] |
|
|
|
|
|
input_size = 320 |
|
scale = min(input_size / orig_w, input_size / orig_h) |
|
new_w = int(orig_w * scale) |
|
new_h = int(orig_h * scale) |
|
|
|
|
|
resized = cv2.resize(frame, (new_w, new_h)) |
|
|
|
|
|
square = np.zeros((input_size, input_size, 3), dtype=np.uint8) |
|
|
|
x_offset = (input_size - new_w) // 2 |
|
y_offset = (input_size - new_h) // 2 |
|
square[y_offset:y_offset+new_h, x_offset:x_offset+new_w] = resized |
|
|
|
|
|
img = cv2.cvtColor(square, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
img = Image.fromarray(img) |
|
|
|
|
|
transform = transforms.Compose([ |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
|
]) |
|
|
|
|
|
img = transform(img).unsqueeze(0) |
|
|
|
return img, (scale, x_offset, y_offset) |
|
|
|
def postprocess_detections(self, detections, preprocess_info, frame_shape): |
|
"""Convert normalized coordinates back to original frame coordinates""" |
|
scale, x_offset, y_offset = preprocess_info |
|
orig_h, orig_w = frame_shape[:2] |
|
processed_detections = [] |
|
|
|
for det in detections[0]['boxes']: |
|
if len(det) == 4: |
|
|
|
x1 = (det[0] - x_offset) / scale |
|
y1 = (det[1] - y_offset) / scale |
|
x2 = (det[2] - x_offset) / scale |
|
y2 = (det[3] - y_offset) / scale |
|
|
|
|
|
x1 = np.clip(x1, 0, orig_w) |
|
y1 = np.clip(y1, 0, orig_h) |
|
x2 = np.clip(x2, 0, orig_w) |
|
y2 = np.clip(y2, 0, orig_h) |
|
|
|
|
|
w = x2 - x1 |
|
h = y2 - y1 |
|
if w > 10 and h > 10 and w < orig_w * 0.9 and h < orig_h * 0.9: |
|
processed_detections.append([int(x1), int(y1), int(x2), int(y2)]) |
|
|
|
return processed_detections |
|
|
|
def process_frame(self, frame): |
|
"""Process a single frame""" |
|
|
|
img, preprocess_info = self.preprocess_frame(frame) |
|
img = img.to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
predictions = self.model(img) |
|
detections = self.model.decode_predictions(predictions, confidence_threshold=self.conf_thresh) |
|
|
|
|
|
boxes = self.postprocess_detections(detections, preprocess_info, frame.shape) |
|
|
|
|
|
frame_with_boxes = frame.copy() |
|
for box in boxes: |
|
x1, y1, x2, y2 = box |
|
cv2.rectangle(frame_with_boxes, (x1, y1), (x2, y2), (0, 255, 0), 2) |
|
|
|
|
|
if len(detections[0]['scores']) > 0: |
|
conf = detections[0]['scores'][0] |
|
cv2.putText(frame_with_boxes, f"Drone: {conf:.2f}", |
|
(x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) |
|
|
|
|
|
fps = 1.0 / (time.time() - self.last_time) if self.last_time else 0.0 |
|
self.last_time = time.time() |
|
|
|
cv2.putText(frame_with_boxes, f"FPS: {fps:.1f} | Detected: {len(boxes)} | Conf: {self.conf_thresh:.2f}", |
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) |
|
|
|
return frame_with_boxes |
|
|
|
def draw_detections(self, frame, detections): |
|
"""Draw detection boxes and labels on frame""" |
|
for det in detections: |
|
x1, y1, x2, y2 = det['box'] |
|
conf = det['confidence'] |
|
|
|
|
|
color = (0, int(255 * conf), 0) |
|
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) |
|
|
|
|
|
label = f"Drone: {conf:.2f}" |
|
cv2.putText(frame, label, (x1, y1-10), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
|
|
|
|
|
if hasattr(self, 'fps'): |
|
cv2.putText(frame, f"FPS: {self.fps:.1f} | Detected: {len(detections)}", |
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
|
|
|
if self.debug_view: |
|
cv2.putText(frame, "Debug View: ON", (10, 60), |
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
cv2.putText(frame, f"Conf Thresh: {self.conf_thresh:.2f}", (10, 90), |
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
|
|
|
|
h, w = frame.shape[:2] |
|
cv2.line(frame, (w//2, 0), (w//2, h), (0, 0, 255), 1) |
|
cv2.line(frame, (0, h//2), (w, h//2), (0, 0, 255), 1) |
|
|
|
return frame |
|
|
|
def run(self): |
|
"""Run webcam detection""" |
|
print("π₯ Starting detection...") |
|
print("Controls:") |
|
print(" 'q': Quit") |
|
print(" 'd': Toggle debug view") |
|
print(" '+': Increase confidence threshold") |
|
print(" '-': Decrease confidence threshold") |
|
|
|
self.last_time = time.time() |
|
|
|
while True: |
|
ret, frame = self.cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
frame_with_boxes = self.process_frame(frame) |
|
|
|
|
|
cv2.imshow('HarpoonNet Detection', frame_with_boxes) |
|
|
|
|
|
key = cv2.waitKey(1) & 0xFF |
|
if key == ord('q'): |
|
break |
|
elif key == ord('d'): |
|
self.debug_view = not self.debug_view |
|
elif key == ord('+'): |
|
self.conf_thresh = min(1.0, self.conf_thresh + 0.05) |
|
print(f"Confidence threshold: {self.conf_thresh:.2f}") |
|
elif key == ord('-'): |
|
self.conf_thresh = max(0.05, self.conf_thresh - 0.05) |
|
print(f"Confidence threshold: {self.conf_thresh:.2f}") |
|
|
|
self.cap.release() |
|
cv2.destroyAllWindows() |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description='HarpoonNet Webcam Detection') |
|
parser.add_argument('--model', type=str, required=True, |
|
help='Path to model checkpoint') |
|
parser.add_argument('--conf', type=float, default=0.6, |
|
help='Initial confidence threshold') |
|
parser.add_argument('--nms', type=float, default=0.4, |
|
help='NMS threshold') |
|
parser.add_argument('--camera', type=int, default=0, |
|
help='Camera device ID') |
|
parser.add_argument('--no-flip', action='store_true', |
|
help='Disable frame flipping') |
|
|
|
args = parser.parse_args() |
|
|
|
try: |
|
detector = WebcamDetector( |
|
model_path=args.model, |
|
conf_thresh=args.conf, |
|
nms_thresh=args.nms, |
|
camera_id=args.camera, |
|
flip_frame=not args.no_flip |
|
) |
|
detector.run() |
|
except Exception as e: |
|
print(f"β Error: {str(e)}") |
|
return 1 |
|
|
|
return 0 |
|
|
|
if __name__ == '__main__': |
|
exit(main()) |