|
"""
|
|
Enhanced Manga Panel Dataset Converter: Extract frames as panels and prepare GCN-ready dataset.
|
|
Crops each frame from manga pages and creates dataset with elements within each panel.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import argparse
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
import difflib
|
|
import math
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
class MangaPanelDatasetConverter:
|
|
def __init__(self, fuzzy_mapping: bool = True, proximity_threshold: float = 200.0):
|
|
self.fuzzy_mapping = fuzzy_mapping
|
|
self.proximity_threshold = proximity_threshold
|
|
self.character_mapping = {}
|
|
self.stats = {
|
|
'total_pages': 0,
|
|
'total_panels': 0,
|
|
'total_bubbles': 0,
|
|
'total_faces': 0,
|
|
'total_bodies': 0,
|
|
'successful_links': 0,
|
|
'failed_links': 0,
|
|
'unique_characters': set(),
|
|
'extracted_panels': 0
|
|
}
|
|
|
|
def load_manga_data(self, file_path: str) -> Dict[str, Any]:
|
|
"""Load manga data from JSON file."""
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
def build_character_mapping(self, character_list: List[Dict]) -> Dict[str, str]:
|
|
"""Build character ID to name mapping from character list."""
|
|
mapping = {}
|
|
for char in character_list:
|
|
mapping_id = char.get('@id', '')
|
|
char_name = char.get('@name', '')
|
|
if mapping_id and char_name:
|
|
mapping[mapping_id] = char_name
|
|
return mapping
|
|
|
|
def check_overlap(self, bbox1: List[int], bbox2: List[int]) -> bool:
|
|
"""Check if two bounding boxes overlap."""
|
|
x1_min, y1_min, x1_max, y1_max = bbox1
|
|
x2_min, y2_min, x2_max, y2_max = bbox2
|
|
|
|
return not (x1_max <= x2_min or x2_max <= x1_min or
|
|
y1_max <= y2_min or y2_max <= y1_min)
|
|
|
|
def is_element_in_frame(self, element_bbox: List[int], frame_bbox: List[int],
|
|
overlap_threshold: float = 0.5) -> bool:
|
|
"""
|
|
Check if an element (face, body, text) is within or significantly overlaps with a frame.
|
|
|
|
Args:
|
|
element_bbox: [xmin, ymin, xmax, ymax] of the element
|
|
frame_bbox: [xmin, ymin, xmax, ymax] of the frame
|
|
overlap_threshold: Minimum overlap ratio to consider element as belonging to frame
|
|
"""
|
|
if not self.check_overlap(element_bbox, frame_bbox):
|
|
return False
|
|
|
|
|
|
x_left = max(element_bbox[0], frame_bbox[0])
|
|
y_top = max(element_bbox[1], frame_bbox[1])
|
|
x_right = min(element_bbox[2], frame_bbox[2])
|
|
y_bottom = min(element_bbox[3], frame_bbox[3])
|
|
|
|
if x_right <= x_left or y_bottom <= y_top:
|
|
return False
|
|
|
|
intersection_area = (x_right - x_left) * (y_bottom - y_top)
|
|
element_area = (element_bbox[2] - element_bbox[0]) * (element_bbox[3] - element_bbox[1])
|
|
|
|
|
|
overlap_ratio = intersection_area / element_area if element_area > 0 else 0
|
|
return overlap_ratio >= overlap_threshold
|
|
|
|
def adjust_coordinates_to_frame(self, element_bbox: List[int], frame_bbox: List[int]) -> List[int]:
|
|
"""Adjust element coordinates to be relative to the cropped frame."""
|
|
frame_x_min, frame_y_min = frame_bbox[0], frame_bbox[1]
|
|
|
|
adjusted_bbox = [
|
|
element_bbox[0] - frame_x_min,
|
|
element_bbox[1] - frame_y_min,
|
|
element_bbox[2] - frame_x_min,
|
|
element_bbox[3] - frame_y_min
|
|
]
|
|
|
|
|
|
adjusted_bbox = [max(0, coord) for coord in adjusted_bbox]
|
|
|
|
return adjusted_bbox
|
|
|
|
def extract_panel_from_page(self, image_path: str, frame_data: Dict,
|
|
output_dir: str, page_index: int, frame_index: int) -> Optional[str]:
|
|
"""
|
|
Extract a single panel (frame) from manga page image.
|
|
|
|
Args:
|
|
image_path: Path to the manga page image
|
|
frame_data: Frame annotation with coordinates
|
|
output_dir: Directory to save extracted panel
|
|
page_index: Index of the current page
|
|
frame_index: Index of the frame within the page
|
|
|
|
Returns:
|
|
Path to extracted panel image or None if failed
|
|
"""
|
|
|
|
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
print(f"Warning: Could not load image {image_path}")
|
|
return None
|
|
|
|
try:
|
|
|
|
xmin = int(frame_data.get('@xmin', 0))
|
|
ymin = int(frame_data.get('@ymin', 0))
|
|
xmax = int(frame_data.get('@xmax', image.shape[1]))
|
|
ymax = int(frame_data.get('@ymax', image.shape[0]))
|
|
|
|
|
|
xmin = max(0, xmin)
|
|
ymin = max(0, ymin)
|
|
xmax = min(image.shape[1], xmax)
|
|
ymax = min(image.shape[0], ymax)
|
|
|
|
if xmax <= xmin or ymax <= ymin:
|
|
print(f"Warning: Invalid frame coordinates for page {page_index}, frame {frame_index}")
|
|
return None
|
|
|
|
|
|
cropped_panel = image[ymin:ymax, xmin:xmax]
|
|
|
|
|
|
panel_filename = f"page_{page_index:04d}_panel_{frame_index:03d}.jpg"
|
|
panel_path = os.path.join(output_dir, panel_filename)
|
|
|
|
|
|
success = cv2.imwrite(panel_path, cropped_panel)
|
|
if success:
|
|
self.stats['extracted_panels'] += 1
|
|
|
|
return panel_path
|
|
else:
|
|
print(f"Warning: Failed to save panel {panel_path}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error extracting panel from page {page_index}, frame {frame_index}: {str(e)}")
|
|
return None
|
|
|
|
def calculate_distance(self, bbox1: List[int], bbox2: List[int]) -> float:
|
|
"""Calculate Euclidean distance between centers of two bounding boxes."""
|
|
cx1, cy1 = (bbox1[0] + bbox1[2]) / 2, (bbox1[1] + bbox1[3]) / 2
|
|
cx2, cy2 = (bbox2[0] + bbox2[2]) / 2, (bbox2[1] + bbox2[3]) / 2
|
|
return math.sqrt((cx1 - cx2)**2 + (cy1 - cy2)**2)
|
|
|
|
def find_character_by_body_id(self, body_id: str, bodies: List[Dict]) -> Optional[str]:
|
|
"""Find character ID associated with a body ID."""
|
|
for body in bodies:
|
|
if body.get('@id') == body_id:
|
|
return body.get('@character')
|
|
return None
|
|
|
|
def find_face_by_character(self, character_id: str, faces: List[Dict]) -> Optional[int]:
|
|
"""Find face index by character ID."""
|
|
for i, face in enumerate(faces):
|
|
if face.get('@character') == character_id:
|
|
return i
|
|
return None
|
|
|
|
def find_closest_face(self, bubble_bbox: List[int], faces: List[Dict]) -> Optional[int]:
|
|
"""Find closest face to bubble based on spatial proximity."""
|
|
if not faces:
|
|
return None
|
|
|
|
min_distance = float('inf')
|
|
closest_face_idx = None
|
|
|
|
for i, face in enumerate(faces):
|
|
face_bbox = [
|
|
face['bbox'][0], face['bbox'][1],
|
|
face['bbox'][2], face['bbox'][3]
|
|
]
|
|
distance = self.calculate_distance(bubble_bbox, face_bbox)
|
|
|
|
if distance < min_distance and distance <= self.proximity_threshold:
|
|
min_distance = distance
|
|
closest_face_idx = i
|
|
|
|
return closest_face_idx
|
|
|
|
def associate_bubble_to_face(self, text_item: Dict, faces: List[Dict],
|
|
bodies: List[Dict], original_bodies: List[Dict]) -> Optional[int]:
|
|
"""Associate a speech bubble to a face using multiple strategies."""
|
|
bubble_bbox = text_item['bbox']
|
|
|
|
|
|
body_id = text_item.get('body_ref')
|
|
if body_id:
|
|
character_id = self.find_character_by_body_id(body_id, original_bodies)
|
|
if character_id:
|
|
face_idx = self.find_face_by_character(character_id, faces)
|
|
if face_idx is not None:
|
|
return face_idx
|
|
|
|
|
|
closest_face_idx = self.find_closest_face(bubble_bbox, faces)
|
|
if closest_face_idx is not None:
|
|
return closest_face_idx
|
|
|
|
|
|
if len(faces) == 1:
|
|
return 0
|
|
|
|
return None
|
|
|
|
def process_panel(self, page_data: Dict, frame_data: Dict, page_index: int,
|
|
frame_index: int, character_mapping: Dict[str, str],
|
|
image_path: str = None, panels_output_dir: str = None) -> Dict[str, Any]:
|
|
"""Process a single panel (frame) and create GCN dataset entry."""
|
|
|
|
|
|
panel_image_path = None
|
|
if image_path and panels_output_dir:
|
|
panel_image_path = self.extract_panel_from_page(
|
|
image_path, frame_data, panels_output_dir, page_index, frame_index
|
|
)
|
|
|
|
|
|
frame_bbox = [
|
|
frame_data['@xmin'], frame_data['@ymin'],
|
|
frame_data['@xmax'], frame_data['@ymax']
|
|
]
|
|
|
|
panel_width = frame_bbox[2] - frame_bbox[0]
|
|
panel_height = frame_bbox[3] - frame_bbox[1]
|
|
|
|
|
|
panel_faces = []
|
|
panel_bodies = []
|
|
panel_bubbles = []
|
|
|
|
|
|
for face in page_data.get('face', []):
|
|
face_bbox = [face['@xmin'], face['@ymin'], face['@xmax'], face['@ymax']]
|
|
if self.is_element_in_frame(face_bbox, frame_bbox):
|
|
adjusted_bbox = self.adjust_coordinates_to_frame(face_bbox, frame_bbox)
|
|
character_id = face.get('@character', '')
|
|
character_name = character_mapping.get(character_id, character_id)
|
|
|
|
panel_faces.append({
|
|
"face_id": len(panel_faces),
|
|
"bbox": adjusted_bbox,
|
|
"original_id": face.get('@id', ''),
|
|
"attributes": {
|
|
"character_id": character_id,
|
|
"character_name": character_name
|
|
}
|
|
})
|
|
|
|
if character_id:
|
|
self.stats['unique_characters'].add(character_id)
|
|
|
|
|
|
for body in page_data.get('body', []):
|
|
body_bbox = [body['@xmin'], body['@ymin'], body['@xmax'], body['@ymax']]
|
|
if self.is_element_in_frame(body_bbox, frame_bbox):
|
|
adjusted_bbox = self.adjust_coordinates_to_frame(body_bbox, frame_bbox)
|
|
character_id = body.get('@character', '')
|
|
character_name = character_mapping.get(character_id, character_id)
|
|
|
|
panel_bodies.append({
|
|
"body_id": len(panel_bodies),
|
|
"bbox": adjusted_bbox,
|
|
"original_id": body.get('@id', ''),
|
|
"attributes": {
|
|
"character_id": character_id,
|
|
"character_name": character_name
|
|
}
|
|
})
|
|
|
|
|
|
for text in page_data.get('text', []):
|
|
text_bbox = [text['@xmin'], text['@ymin'], text['@xmax'], text['@ymax']]
|
|
if self.is_element_in_frame(text_bbox, frame_bbox):
|
|
adjusted_bbox = self.adjust_coordinates_to_frame(text_bbox, frame_bbox)
|
|
|
|
panel_bubbles.append({
|
|
"bubble_id": len(panel_bubbles),
|
|
"bbox": adjusted_bbox,
|
|
"text": text.get('#text', ''),
|
|
"original_id": text.get('@id', ''),
|
|
"body_ref": text.get('@bodyid', '')
|
|
})
|
|
|
|
|
|
links = []
|
|
original_bodies = page_data.get('body', [])
|
|
|
|
for bubble in panel_bubbles:
|
|
face_idx = self.associate_bubble_to_face(
|
|
bubble, panel_faces, panel_bodies, original_bodies
|
|
)
|
|
if face_idx is not None:
|
|
links.append({
|
|
"bubble_id": bubble['bubble_id'],
|
|
"face_id": face_idx
|
|
})
|
|
self.stats['successful_links'] += 1
|
|
else:
|
|
self.stats['failed_links'] += 1
|
|
|
|
|
|
self.stats['total_panels'] += 1
|
|
self.stats['total_bubbles'] += len(panel_bubbles)
|
|
self.stats['total_faces'] += len(panel_faces)
|
|
self.stats['total_bodies'] += len(panel_bodies)
|
|
|
|
|
|
panel_id = f"page_{page_index:04d}_panel_{frame_index:03d}"
|
|
|
|
return {
|
|
"panel_id": panel_id,
|
|
"width": panel_width,
|
|
"height": panel_height,
|
|
"panel_image_path": panel_image_path,
|
|
"bubbles": panel_bubbles,
|
|
"faces": panel_faces,
|
|
"bodies": panel_bodies,
|
|
"links": links,
|
|
"metadata": {
|
|
"original_page_index": page_index,
|
|
"original_frame_index": frame_index,
|
|
"original_frame_id": frame_data.get('@id', ''),
|
|
"original_frame_bbox": frame_bbox
|
|
}
|
|
}
|
|
|
|
def convert_dataset(self, input_file: str, output_file: str,
|
|
image_dir: str = None, panels_output_dir: str = None) -> Dict[str, Any]:
|
|
"""Convert manga dataset to panel-based GCN format."""
|
|
print(f"Loading manga data from: {input_file}")
|
|
manga_data = self.load_manga_data(input_file)
|
|
|
|
|
|
title = manga_data.get('title', 'Unknown')
|
|
character_list = manga_data.get('character', [])
|
|
character_mapping = self.build_character_mapping(character_list)
|
|
|
|
print(f"Title: {title}")
|
|
print(f"Characters found: {len(character_mapping)}")
|
|
|
|
|
|
if panels_output_dir:
|
|
os.makedirs(panels_output_dir, exist_ok=True)
|
|
print(f"Panel extraction enabled. Output directory: {panels_output_dir}")
|
|
|
|
|
|
pages = manga_data.get('page', [])
|
|
converted_panels = []
|
|
|
|
for page_idx, page_data in enumerate(pages):
|
|
self.stats['total_pages'] += 1
|
|
|
|
|
|
image_path = os.path.join(image_dir,f"{str(page_data.get('@index', page_idx)).zfill(3)}.jpg")
|
|
|
|
|
|
frames = page_data.get('frame', [])
|
|
for frame_idx, frame_data in enumerate(frames):
|
|
panel = self.process_panel(
|
|
page_data, frame_data, page_idx, frame_idx,
|
|
character_mapping, image_path, panels_output_dir
|
|
)
|
|
converted_panels.append(panel)
|
|
|
|
|
|
dataset = {
|
|
"title": title,
|
|
"character_mapping": character_mapping,
|
|
"panels": converted_panels,
|
|
"conversion_stats": {
|
|
"total_pages": self.stats['total_pages'],
|
|
"total_panels": self.stats['total_panels'],
|
|
"total_bubbles": self.stats['total_bubbles'],
|
|
"total_faces": self.stats['total_faces'],
|
|
"total_bodies": self.stats['total_bodies'],
|
|
"successful_links": self.stats['successful_links'],
|
|
"failed_links": self.stats['failed_links'],
|
|
"extracted_panels": self.stats['extracted_panels'],
|
|
"unique_characters": len(self.stats['unique_characters']),
|
|
"link_success_rate": (self.stats['successful_links'] /
|
|
max(1, self.stats['total_bubbles'])) * 100
|
|
}
|
|
}
|
|
|
|
|
|
print(f"Saving converted dataset to: {output_file}")
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(dataset, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
self.print_conversion_stats()
|
|
|
|
return dataset
|
|
|
|
def print_conversion_stats(self):
|
|
"""Print conversion statistics."""
|
|
print("\n=== Conversion Statistics ===")
|
|
print(f"Total pages processed: {self.stats['total_pages']}")
|
|
print(f"Total panels extracted: {self.stats['total_panels']}")
|
|
print(f"Total speech bubbles: {self.stats['total_bubbles']}")
|
|
print(f"Total faces: {self.stats['total_faces']}")
|
|
print(f"Total bodies: {self.stats['total_bodies']}")
|
|
print(f"Successful links: {self.stats['successful_links']}")
|
|
print(f"Failed links: {self.stats['failed_links']}")
|
|
print(f"Panel images extracted: {self.stats['extracted_panels']}")
|
|
print(f"Unique characters: {len(self.stats['unique_characters'])}")
|
|
if self.stats['total_bubbles'] > 0:
|
|
success_rate = (self.stats['successful_links'] / self.stats['total_bubbles']) * 100
|
|
print(f"Link success rate: {success_rate:.1f}%")
|
|
print("=" * 30)
|
|
|
|
|
|
def json_to_gcn(config):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
base_dir = config["root"]
|
|
manga109_root_dir = config["manga109_root_dir"]
|
|
root_json = config["root_json"]
|
|
|
|
for book_json in os.listdir(root_json):
|
|
book_name = str(book_json).split(".")[0]
|
|
panel_dir = os.path.join(base_dir,"panels",book_name)
|
|
if not os.path.exists(panel_dir):
|
|
os.makedirs(panel_dir)
|
|
|
|
|
|
converter = MangaPanelDatasetConverter(
|
|
|
|
|
|
)
|
|
input_file = os.path.join(root_json,book_json)
|
|
output_file = os.path.join(base_dir,"panel_data",book_json)
|
|
image_dir = os.path.join(manga109_root_dir,"images",book_name)
|
|
try:
|
|
result = converter.convert_dataset(
|
|
input_file,
|
|
output_file,
|
|
image_dir,
|
|
panel_dir
|
|
)
|
|
print(f"\nConversion completed successfully!")
|
|
print(f"GCN dataset saved to: {output_file}")
|
|
print(f"Panel images saved to: {panel_dir}")
|
|
|
|
except Exception as e:
|
|
print(f"Error during conversion: {str(e)}")
|
|
|
|
|