# app.py – YOLOv8 Dataset Quality Evaluator for Hugging Face Spaces """ Gradio application for evaluating the quality of YOLO‑format object‑detection datasets exported from Roboflow (or any other labeling tool). The app runs a configurable pipeline of automated checks and returns a structured report plus visual artefacts that make it easy to spot problems. Designed for **Hugging Face Spaces**: * Keep the file name `app.py` (Spaces’ default entry‑point). * Add a `requirements.txt` (see README) so Spaces installs the right deps. * The app binds to `0.0.0.0` and picks up the port from the `PORT` env var (set by Spaces). Checks implemented ------------------ 1. **Dataset integrity** – verify that every image has a label file (or an allowed empty‑label exemption) and that each label file parses correctly. 2. **Class stats / balance** – count instances per class and per‑image instance distribution. 3. **Image quality** – flag blurry, too‑dark or over‑bright images using simple OpenCV heuristics. 4. **Duplicate & near‑duplicate images** – perceptual‑hash pass (fallback) or FastDup if available. 5. **Duplicate boxes** – IoU > 0.9 duplicates in the same image. 6. **Optional model‑assisted label QA** – if the user provides a YOLO weights file, run inference and compute IoU‑based agreement metrics plus Cleanlab label‑quality scores when the library is installed. 7. **Composite scoring** – combine sub‑scores (with adjustable weights) into a final 0‑100 quality score. The code is intentionally modular: each check lives in its own function that returns a `dict` of metrics; adding new checks is as simple as creating another function that follows the same signature and adding it to the `CHECKS` list. """ from __future__ import annotations import imghdr import json import os import shutil import tempfile from collections import Counter from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed import gradio as gr import numpy as np import pandas as pd import yaml from PIL import Image from tqdm import tqdm # Optional imports (wrapped so the app still works without them) try: import cv2 # type: ignore except ImportError: cv2 = None # pragma: no cover try: import imagehash # type: ignore except ImportError: imagehash = None # pragma: no cover try: from ultralytics import YOLO # type: ignore except ImportError: YOLO = None # noqa: N806 try: from cleanlab.object_detection import rank as cl_rank # type: ignore except ImportError: cl_rank = None FASTDUP_AVAILABLE = False # lazy‑loaded if requested # -------------------------------------------------------------------------------------- # Utility dataclasses # -------------------------------------------------------------------------------------- @dataclass class ImageMetrics: path: Path width: int height: int blur_score: float | None = None brightness: float | None = None @property def aspect_ratio(self) -> float: return self.width / self.height if self.height else 0 @dataclass class DuplicateGroup: hash_val: str paths: List[Path] # -------------------------------------------------------------------------------------- # Core helpers # -------------------------------------------------------------------------------------- def load_yaml(yaml_path: Path) -> Dict: with yaml_path.open("r", encoding="utf-8") as f: return yaml.safe_load(f) def parse_label_file(label_path: Path) -> List[Tuple[int, float, float, float, float]]: """Return list of (class_id, x_center, y_center, width, height).""" entries: List[Tuple[int, float, float, float, float]] = [] with label_path.open("r", encoding="utf-8") as f: for line in f: parts = line.strip().split() if len(parts) != 5: raise ValueError(f"Malformed label line in {label_path}: {line}") class_id, *coords = parts entries.append((int(class_id), *map(float, coords))) return entries def guess_image_dirs(root: Path) -> List[Path]: """Return potential images sub‑directories under a Roboflow/YOLO export.""" candidates = [ root / "images", root / "train" / "images", root / "valid" / "images", root / "val" / "images", root / "test" / "images", ] return [p for p in candidates if p.exists()] def gather_dataset(root: Path, yaml_path: Path | None = None) -> Tuple[List[Path], List[Path], Dict]: """Return (image_paths, label_paths, yaml_dict).""" if yaml_path is None: yaml_candidates = list(root.glob("*.yaml")) if not yaml_candidates: raise FileNotFoundError("Could not find a YAML config in dataset root; please supply explicitly.") yaml_path = yaml_candidates[0] meta = load_yaml(yaml_path) image_dirs = guess_image_dirs(root) if not image_dirs: raise FileNotFoundError("No images directory found under dataset root; expected images/ subfolder(s).") image_paths: List[Path] = [p for d in image_dirs for p in d.rglob("*.*") if imghdr.what(p) is not None] label_paths: List[Path] = [] for img_path in image_paths: # /images/img123.jpg -> /labels/img123.txt label_path = img_path.parent.parent / "labels" / f"{img_path.stem}.txt" label_paths.append(label_path) return image_paths, label_paths, meta # -------------------------------------------------------------------------------------- # Individual checks # -------------------------------------------------------------------------------------- def _is_corrupt(img_path: Path) -> bool: try: with Image.open(img_path) as im: im.verify() return False except Exception: # noqa: BLE001 return True def check_integrity(image_paths: List[Path], label_paths: List[Path]) -> Dict: """Verify that images and labels exist and are readable.""" missing_labels = [img for img, lbl in zip(image_paths, label_paths) if not lbl.exists()] missing_images = [lbl for lbl in label_paths if lbl.exists() and not lbl.with_name("images").exists()] # Parallel corruption check for speed on Spaces CPU boxes corrupt_images = [] with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as ex: futures = {ex.submit(_is_corrupt, p): p for p in image_paths} for fut in tqdm(as_completed(futures), total=len(futures), desc="Integrity", leave=False): if fut.result(): corrupt_images.append(futures[fut]) score = 100 - (len(missing_labels) + len(missing_images) + len(corrupt_images)) / max(len(image_paths), 1) * 100 return { "name": "Integrity", "score": max(score, 0), "details": { "missing_label_files": [str(p) for p in missing_labels], "missing_image_files": [str(p) for p in missing_images], "corrupt_images": [str(p) for p in corrupt_images], }, } def compute_class_stats(label_paths: List[Path]) -> Dict: class_counts = Counter() boxes_per_image = [] for lbl in label_paths: if not lbl.exists(): continue boxes = parse_label_file(lbl) boxes_per_image.append(len(boxes)) class_counts.update([b[0] for b in boxes]) if not class_counts: return {"name": "Class balance", "score": 0, "details": {"message": "No labels found"}} max_count, min_count = max(class_counts.values()), min(class_counts.values()) balance_score = min_count / max_count * 100 if max_count else 0 return { "name": "Class balance", "score": balance_score, "details": { "class_counts": dict(class_counts), "boxes_per_image_stats": { "min": int(np.min(boxes_per_image) if boxes_per_image else 0), "max": int(np.max(boxes_per_image) if boxes_per_image else 0), "mean": float(np.mean(boxes_per_image) if boxes_per_image else 0), }, }, } def image_quality_metrics(image_paths: List[Path], blur_thresh: float = 100.0) -> Dict: if cv2 is None: return {"name": "Image quality", "score": 100, "details": {"message": "cv2 not installed – check skipped"}} blurry, dark, bright = [], [], [] for p in tqdm(image_paths, desc="Image quality", leave=False): img = cv2.imread(str(p)) if img is None: continue gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) lap_var = cv2.Laplacian(gray, cv2.CV_64F).var() brightness = np.mean(gray) if lap_var < blur_thresh: blurry.append(p) if brightness < 25: dark.append(p) if brightness > 230: bright.append(p) total = len(image_paths) bad = len(set(blurry + dark + bright)) score = 100 - bad / max(total, 1) * 100 return { "name": "Image quality", "score": score, "details": { "blurry": [str(p) for p in blurry], "dark": [str(p) for p in dark], "bright": [str(p) for p in bright], }, } def detect_duplicates(image_paths: List[Path], use_fastdup: bool = False) -> Dict: if use_fastdup: global FASTDUP_AVAILABLE try: import fastdup # type: ignore FASTDUP_AVAILABLE = True except ImportError: use_fastdup = False duplicate_groups: List[DuplicateGroup] = [] if use_fastdup and FASTDUP_AVAILABLE and len(image_paths): fd = fastdup.create(input_dir=str(image_paths[0].parent.parent), work_dir="fastdup_work") fd.run(num_images=0) clusters = fd.clusters # type: ignore[attr