import numpy as np from typing import List, Dict, Tuple, Optional from scipy.optimize import linear_sum_assignment import yaml import cv2 import hashlib import torch import torch.nn as nn from PIL import Image # DINOv2 availability check DINOV2_AVAILABLE = False try: # Try loading DINOv2 from torch.hub first torch.hub._validate_not_a_forked_repo = lambda a, b, c: True # Allow loading from hub DINOV2_AVAILABLE = True print("✓ DINOv2 will be loaded when needed") except Exception as e: print(f"⚠ DINOv2 preparation failed: {e}") # Fallback: Try transformers library try: from transformers import AutoImageProcessor, AutoModel DINOV2_AVAILABLE = True print("✓ DINOv2 available via transformers library") except ImportError: print("DINOv2 not available. Using traditional features only.") print(" Install with: pip install transformers torch") # Global DINOv2 model and processor _GLOBAL_DINOV2_MODEL = None _GLOBAL_DINOV2_PROCESSOR = None _GLOBAL_DINOV2_TRANSFORM = None def get_dinov2_model(): """Get or initialize global DINOv2 model""" global _GLOBAL_DINOV2_MODEL, _GLOBAL_DINOV2_PROCESSOR, _GLOBAL_DINOV2_TRANSFORM if _GLOBAL_DINOV2_MODEL is None and DINOV2_AVAILABLE: try: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Try torch.hub first (preferred method) try: # Load DINOv2 small model (you can change to vitb14, vitl14, or vitg14 for larger models) # vits14 = small (85M params), vitb14 = base (307M), vitl14 = large (1B), vitg14 = giant (1.8B) model_name = 'dinov2_vits14' # Using small model for speed _GLOBAL_DINOV2_MODEL = torch.hub.load('facebookresearch/dinov2', model_name) _GLOBAL_DINOV2_MODEL.to(device) _GLOBAL_DINOV2_MODEL.eval() # Disable gradient computation for inference for param in _GLOBAL_DINOV2_MODEL.parameters(): param.requires_grad = False # Create transform for DINOv2 import torchvision.transforms as T _GLOBAL_DINOV2_TRANSFORM = T.Compose([ T.Resize(256, interpolation=T.InterpolationMode.BICUBIC), T.CenterCrop(224), T.ToTensor(), T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) print(f"✓ DINOv2 model loaded: {model_name} via torch.hub") except Exception as hub_error: print(f"Torch.hub failed, trying transformers: {hub_error}") # Fallback to transformers library from transformers import AutoImageProcessor, AutoModel model_name = "facebook/dinov2-small" # Or dinov2-base, dinov2-large, dinov2-giant _GLOBAL_DINOV2_PROCESSOR = AutoImageProcessor.from_pretrained(model_name) _GLOBAL_DINOV2_MODEL = AutoModel.from_pretrained(model_name).to(device) _GLOBAL_DINOV2_MODEL.eval() for param in _GLOBAL_DINOV2_MODEL.parameters(): param.requires_grad = False print(f"✓ DINOv2 model loaded: {model_name} via transformers") except Exception as e: print(f"⚠ DINOv2 loading failed: {e}. Using fallback features.") _GLOBAL_DINOV2_MODEL = None _GLOBAL_DINOV2_PROCESSOR = None _GLOBAL_DINOV2_TRANSFORM = None return _GLOBAL_DINOV2_MODEL, _GLOBAL_DINOV2_PROCESSOR, _GLOBAL_DINOV2_TRANSFORM class DamageComparator: """Enhanced damage comparator with DINOv2-based view-invariant re-identification""" def __init__(self, config_path: str = "config.yaml"): """Initialize comparator with configuration""" with open(config_path, 'r') as f: self.config = yaml.safe_load(f) self.iou_threshold = self.config['comparison']['iou_match_threshold'] self.position_tolerance = self.config['comparison']['position_tolerance'] self.alpha = self.config.get("comparison", {}).get("alpha", 0.6) self.beta = self.config.get("comparison", {}).get("beta", 0.3) self.gamma = self.config.get("comparison", {}).get("gamma", 0.1) self.combined_score_threshold = self.config.get("comparison", {}).get("combined_score_threshold", 0.5) # Device selection self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Get global DINOv2 model self.dinov2_model, self.dinov2_processor, self.dinov2_transform = get_dinov2_model() # ReID thresholds (DINOv2 typically needs different thresholds than CLIP) self.reid_similarity_threshold = 0.7 # Slightly higher for DINOv2 self.feature_cache = {} # DINOv2 feature dimension (depends on model size) self.dinov2_feature_dim = 384 if 'vits' in str(self.dinov2_model.__class__) else 768 def calculate_iou(self, box1: List[int], box2: List[int]) -> float: """Calculate Intersection over Union between two boxes""" x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) if x2 < x1 or y2 < y1: return 0.0 intersection = (x2 - x1) * (y2 - y1) box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) union = box1_area + box2_area - intersection if union == 0: return 0.0 return intersection / union def extract_damage_features(self, image: np.ndarray, bbox: List[int]) -> np.ndarray: """ Extract view-invariant features for damage ReID using DINOv2 Args: image: Full image (BGR format from OpenCV) bbox: [x1, y1, x2, y2] bounding box Returns: Feature vector for ReID """ x1, y1, x2, y2 = bbox # Ensure valid bbox x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2) damage_roi = image[y1:y2, x1:x2] if damage_roi.size == 0: return np.zeros(256) # Return zero vector for invalid ROI features_list = [] # 1. DINOv2 features (if available) - Most powerful for ReID if self.dinov2_model is not None: dinov2_features = self._extract_dinov2_features(damage_roi) features_list.append(dinov2_features) # 2. Geometric invariant features (always available) geometric_features = self._extract_geometric_features(damage_roi) features_list.append(geometric_features) # 3. Texture features texture_features = self._extract_texture_features(damage_roi) features_list.append(texture_features) # 4. Context features (position on car) context_features = self._extract_context_features(image, bbox) features_list.append(context_features) # Concatenate and normalize combined_features = np.concatenate(features_list, axis=0) # L2 normalization for cosine similarity norm = np.linalg.norm(combined_features) if norm > 0: combined_features = combined_features / norm return combined_features def _extract_dinov2_features(self, roi: np.ndarray) -> np.ndarray: """Extract DINOv2 vision features - superior to CLIP for visual tasks""" try: # Convert BGR to RGB roi_rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB) # Convert to PIL Image roi_pil = Image.fromarray(roi_rgb) # CRITICAL: Always use no_grad context to save memory with torch.no_grad(): if self.dinov2_transform is not None: # Using torch.hub version # Apply transforms roi_tensor = self.dinov2_transform(roi_pil).unsqueeze(0).to(self.device) # Extract features features = self.dinov2_model(roi_tensor) # DINOv2 returns [batch_size, num_patches+1, feature_dim] # We use the [CLS] token (first token) as the global feature if len(features.shape) == 3: features = features[:, 0, :] # Get CLS token # Move to CPU immediately to free VRAM features = features.cpu().numpy().flatten() # Clear CUDA cache if using GPU if self.device.type == 'cuda': torch.cuda.empty_cache() elif self.dinov2_processor is not None: # Using transformers version inputs = self.dinov2_processor(images=roi_pil, return_tensors="pt") inputs = {k: v.to(self.device) for k, v in inputs.items()} outputs = self.dinov2_model(**inputs) # Use pooler_output if available, otherwise use last_hidden_state if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: features = outputs.pooler_output else: # Use CLS token from last hidden state features = outputs.last_hidden_state[:, 0, :] # Move to CPU immediately to free VRAM features = features.cpu().numpy().flatten() # Clear CUDA cache if using GPU if self.device.type == 'cuda': torch.cuda.empty_cache() else: return np.zeros(128) gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150) edge_density = np.sum(edges > 0) / edges.size # Combine với DINOv2 features features = np.concatenate([features[:120], [edge_density * 100] * 8]) return features[:128] # Take first 128 dimensions for consistency except Exception as e: print(f"DINOv2 feature extraction error: {e}") return np.zeros(128) def _extract_geometric_features(self, roi: np.ndarray) -> np.ndarray: """Extract geometric invariant features (Hu moments)""" features = [] gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) # Hu moments - invariant to rotation, scale, translation try: moments = cv2.moments(gray) hu_moments = cv2.HuMoments(moments).flatten() # Log transform for stability hu_moments = -np.sign(hu_moments) * np.log10(np.abs(hu_moments) + 1e-10) features.extend(hu_moments[:7]) except: features.extend([0] * 7) # Shape features edges = cv2.Canny(gray, 50, 150) contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: largest_contour = max(contours, key=cv2.contourArea) area = cv2.contourArea(largest_contour) perimeter = cv2.arcLength(largest_contour, True) if perimeter > 0: circularity = 4 * np.pi * area / (perimeter ** 2) features.append(circularity) else: features.append(0) # Aspect ratio x, y, w, h = cv2.boundingRect(largest_contour) aspect_ratio = w / h if h > 0 else 1 features.append(aspect_ratio) else: features.extend([0, 0]) return np.array(features) def _extract_texture_features(self, roi: np.ndarray) -> np.ndarray: """Extract texture features using gradient statistics""" gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) # Resize to fixed size for consistency gray_resized = cv2.resize(gray, (32, 32)) # Simple texture statistics features = [] features.append(np.mean(gray_resized)) features.append(np.std(gray_resized)) # Gradient features dx = cv2.Sobel(gray_resized, cv2.CV_64F, 1, 0, ksize=3) dy = cv2.Sobel(gray_resized, cv2.CV_64F, 0, 1, ksize=3) features.append(np.mean(np.abs(dx))) features.append(np.mean(np.abs(dy))) features.append(np.std(dx)) features.append(np.std(dy)) return np.array(features) def _extract_context_features(self, image: np.ndarray, bbox: List[int]) -> np.ndarray: """Extract context features (position on car)""" h, w = image.shape[:2] x1, y1, x2, y2 = bbox # Normalized position cx = (x1 + x2) / 2 / w cy = (y1 + y2) / 2 / h width_ratio = (x2 - x1) / w height_ratio = (y2 - y1) / h # Position indicators is_left = cx < 0.33 is_center = 0.33 <= cx <= 0.67 is_right = cx > 0.67 is_top = cy < 0.4 is_middle = 0.4 <= cy <= 0.7 is_bottom = cy > 0.7 features = [ cx, cy, width_ratio, height_ratio, float(is_left), float(is_center), float(is_right), float(is_top), float(is_middle), float(is_bottom) ] return np.array(features) def compute_match_score(self, box1, box2, reid_sim, alpha=0.6, beta=0.3, gamma=0.1): """ Weighted score combining ReID, IoU, and position alpha, beta, gamma = weights """ # IoU iou = self.calculate_iou(box1, box2) # Tính khoảng cách tâm cx1, cy1 = (box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2 cx2, cy2 = (box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2 dist = ((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2) ** 0.5 # Convert distance → score (0 → 1) pos_score = max(0, 1 - dist / self.position_tolerance) # Weighted score return alpha * reid_sim + beta * iou + gamma * pos_score def match_damages_with_reid(self, detections1: Dict, detections2: Dict, image1: Optional[np.ndarray] = None, image2: Optional[np.ndarray] = None) -> Dict: """ Enhanced damage matching with DINOv2 ReID capability Args: detections1, detections2: Detection results image1, image2: Original images for feature extraction Returns: Matching results with ReID """ boxes1 = detections1['boxes'] boxes2 = detections2['boxes'] print(f"\n🔍 DEBUG match_damages_with_reid (DINOv2):") print(f" Boxes1: {len(boxes1)}, Boxes2: {len(boxes2)}") print(f" Images provided: {image1 is not None and image2 is not None}") print(f" DINOv2 available: {self.dinov2_model is not None}") if len(boxes1) == 0 and len(boxes2) == 0: return { 'matched_pairs': [], 'unmatched_before': [], 'unmatched_after': [], 'iou_matrix': None, 'reid_scores': None } if len(boxes1) == 0: return { 'matched_pairs': [], 'unmatched_before': [], 'unmatched_after': list(range(len(boxes2))), 'iou_matrix': None, 'reid_scores': None } if len(boxes2) == 0: return { 'matched_pairs': [], 'unmatched_before': list(range(len(boxes1))), 'unmatched_after': [], 'iou_matrix': None, 'reid_scores': None } # Calculate IoU matrix (traditional matching) iou_matrix = np.zeros((len(boxes1), len(boxes2))) for i, box1 in enumerate(boxes1): for j, box2 in enumerate(boxes2): iou_matrix[i, j] = self.calculate_iou(box1, box2) # Calculate ReID similarity matrix if images provided reid_matrix = None if image1 is not None and image2 is not None: reid_matrix = np.zeros((len(boxes1), len(boxes2))) print(" Extracting DINOv2 features for damage matching...") # Extract features for all boxes features1 = [self.extract_damage_features(image1, box) for box in boxes1] features2 = [self.extract_damage_features(image2, box) for box in boxes2] # Calculate cosine similarity for i, feat1 in enumerate(features1): for j, feat2 in enumerate(features2): reid_matrix[i, j] = np.dot(feat1, feat2) # Already normalized print(f" DINOv2 features extracted successfully") if reid_matrix is not None: combined_matrix = np.zeros_like(reid_matrix) for i, box1 in enumerate(boxes1): for j, box2 in enumerate(boxes2): combined_matrix[i, j] = self.compute_match_score( box1, box2, reid_matrix[i, j], alpha=0.6, beta=0.3, gamma=0.1 ) else: combined_matrix = iou_matrix # Hungarian algorithm for optimal matching cost_matrix = 1 - combined_matrix row_indices, col_indices = linear_sum_assignment(cost_matrix) # Filter matches by threshold matched_pairs = [] matched_rows = set() matched_cols = set() # Use different threshold based on whether ReID is available threshold = self.combined_score_threshold if reid_matrix is not None else self.iou_threshold # Dòng ~560 trong comparison.py # Trong match_damages_with_reid, dòng ~560 # Trong match_damages_with_reid, dòng 556-571 for i, j in zip(row_indices, col_indices): score = combined_matrix[i, j] iou_score = iou_matrix[i, j] # Logic mới - phù hợp với config thực tế if iou_score >= self.iou_threshold: # >= 0.35 từ config # Good IoU - dùng threshold thấp threshold_to_use = self.combined_score_threshold # 0.3 elif iou_score > 0.1: # Medium IoU - threshold vừa phải threshold_to_use = 0.45 elif iou_score > 0.05: # Low IoU - cần score cao hơn threshold_to_use = 0.55 else: # Very low IoU - cần ReID rất tốt threshold_to_use = 0.65 print(f" Pair ({i},{j}): IoU={iou_score:.3f}, Score={score:.3f}, Threshold={threshold_to_use:.3f}") if score >= threshold_to_use: if detections1['classes'][i] == detections2['classes'][j]: matched_pairs.append((i, j, score)) matched_rows.add(i) matched_cols.add(j) # Find unmatched damages unmatched_before = [i for i in range(len(boxes1)) if i not in matched_rows] unmatched_after = [j for j in range(len(boxes2)) if j not in matched_cols] print(f" IoU matrix max: {iou_matrix.max():.3f}") print(f" Combined score max: {combined_matrix.max():.3f}") print(f" Matched pairs: {len(matched_pairs)}") return { 'matched_pairs': matched_pairs, 'unmatched_before': unmatched_before, 'unmatched_after': unmatched_after, 'iou_matrix': iou_matrix.tolist(), 'reid_scores': reid_matrix.tolist() if reid_matrix is not None else None } def match_damages(self, detections1: Dict, detections2: Dict) -> Dict: """ Original matching method (backward compatibility) """ return self.match_damages_with_reid(detections1, detections2, None, None) def analyze_damage_status(self, before_detections: Dict, after_detections: Dict, before_image: Optional[np.ndarray] = None, after_image: Optional[np.ndarray] = None) -> Dict: """ Enhanced damage analysis with DINOv2 ReID support """ # Use enhanced matching with ReID if images provided matching = self.match_damages_with_reid( before_detections, after_detections, before_image, after_image ) # Extract damage information matched_damages = [] for i, j, score in matching['matched_pairs']: matched_damages.append({ 'type': before_detections['classes'][i], 'confidence_before': float(before_detections['confidences'][i]), 'confidence_after': float(after_detections['confidences'][j]), 'box_before': before_detections['boxes'][i], 'box_after': after_detections['boxes'][j], 'matching_score': float(score), 'is_same_damage': bool(score > self.reid_similarity_threshold) }) existing_damages = [] for i in matching['unmatched_before']: existing_damages.append({ 'type': before_detections['classes'][i], 'confidence': float(before_detections['confidences'][i]), 'box': before_detections['boxes'][i] }) new_damages = [] for j in matching['unmatched_after']: new_damages.append({ 'type': after_detections['classes'][j], 'confidence': float(after_detections['confidences'][j]), 'box': after_detections['boxes'][j] }) # Determine case case = self._determine_case(matched_damages, existing_damages, new_damages) return { 'case': case['type'], 'message': case['message'], 'matched_damages': matched_damages, 'repaired_damages': existing_damages, 'new_damages': new_damages, 'statistics': { 'total_before': len(before_detections['boxes']), 'total_after': len(after_detections['boxes']), 'matched': len(matched_damages), 'repaired': len(existing_damages), 'new': len(new_damages), 'using_reid': bool(before_image is not None and after_image is not None), 'reid_model': 'DINOv2' if self.dinov2_model is not None else 'Traditional' } } def _determine_case(self, matched: List, repaired: List, new: List) -> Dict: """Determine which case the comparison falls into""" # Case 3: Happy case - no damages at all if len(matched) == 0 and len(repaired) == 0 and len(new) == 0: return { 'type': 'CASE_3_SUCCESS', 'message': 'Successful delivery - No damage detected' } # Case 1: Existing damages remain if len(matched) > 0 and len(new) == 0: return { 'type': 'CASE_1_EXISTING', 'message': 'Error from the beginning, not during delivery -> Delivery completed' } # Case 2: New damages detected if len(new) > 0: return { 'type': 'CASE_2_NEW_DAMAGE', 'message': 'Delivery Defect - New Damage Discovered' } # Special case: All damages repaired if len(repaired) > 0 and len(new) == 0 and len(matched) == 0: return { 'type': 'CASE_REPAIRED', 'message': 'All damage repaired - Vehicle delivered successfully' } return { 'type': 'CASE_UNKNOWN', 'message': 'Status Undetermined' } def deduplicate_detections_across_views(self, detections_list: List[Dict], images_list: List[np.ndarray]) -> Dict: """ Deduplicate damages across multiple views using DINOv2 features Args: detections_list: List of detections from different views images_list: List of corresponding images Returns: Unique damages with their appearances in different views """ all_damages = [] print(f"Deduplicating damages across {len(images_list)} views using DINOv2...") # Collect all damages with their features for view_idx, (detections, image) in enumerate(zip(detections_list, images_list)): for i, bbox in enumerate(detections['boxes']): features = self.extract_damage_features(image, bbox) all_damages.append({ 'view_idx': view_idx, 'bbox': bbox, 'class': detections['classes'][i], 'confidence': detections['confidences'][i], 'features': features }) # Group similar damages groups = [] used = set() for i, damage1 in enumerate(all_damages): if i in used: continue group = [damage1] used.add(i) for j, damage2 in enumerate(all_damages): if j in used or damage1['view_idx'] == damage2['view_idx']: continue # Calculate similarity using DINOv2 features similarity = np.dot(damage1['features'], damage2['features']) # DINOv2 typically needs slightly higher threshold if similarity > self.reid_similarity_threshold: # Check class consistency if damage1['class'] == damage2['class']: group.append(damage2) used.add(j) groups.append(group) # Create unique damage IDs unique_damages = {} for group_idx, group in enumerate(groups): # Generate consistent ID based on features feature_hash = hashlib.md5( group[0]['features'].tobytes() ).hexdigest()[:8] damage_id = f"DMG_{feature_hash}" unique_damages[damage_id] = { 'views': [d['view_idx'] for d in group], 'class': group[0]['class'], 'avg_confidence': np.mean([d['confidence'] for d in group]), 'detections': group, 'reid_model': 'DINOv2' } print(f"Identified {len(unique_damages)} unique damages from {len(all_damages)} total detections") return unique_damages