Spaces:
Runtime error
Runtime error
import multiprocessing | |
import os | |
import random | |
import shutil | |
import tempfile | |
import zipfile | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from datetime import datetime | |
import fitz # PyMuPDF | |
import gradio as gr | |
from huggingface_hub import DatasetCard, DatasetCardData, HfApi | |
from PIL import Image | |
from dataset_card_template import DATASET_CARD_TEMPLATE | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
CPU_COUNT = multiprocessing.cpu_count() | |
MAX_WORKERS = min(32, CPU_COUNT) # Use CPU count directly for processes | |
def process_pdf(pdf_file, sample_percentage, temp_dir): | |
try: | |
pdf_path = pdf_file.name | |
doc = fitz.open(pdf_path) | |
total_pages = len(doc) | |
pages_to_convert = int(total_pages * (sample_percentage / 100)) | |
pages_to_convert = max( | |
1, min(pages_to_convert, total_pages) | |
) # Ensure at least one page and not more than total pages | |
selected_pages = ( | |
sorted(random.sample(range(total_pages), pages_to_convert)) | |
if 0 < sample_percentage < 100 | |
else range(total_pages) | |
) | |
images = [] | |
for page_num in selected_pages: | |
page = doc[page_num] | |
pix = page.get_pixmap() # Remove the Matrix scaling | |
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
image_path = os.path.join( | |
temp_dir, f"{os.path.basename(pdf_path)}_page_{page_num+1}.jpg" | |
) | |
image.save(image_path, "JPEG", quality=85, optimize=True) | |
images.append(image_path) | |
doc.close() | |
return images, None, len(images) | |
except Exception as e: | |
return [], f"Error processing {pdf_file.name}: {str(e)}", 0 | |
def pdf_to_images(pdf_files, sample_percentage, temp_dir, progress=gr.Progress()): | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
progress(0, desc="Starting conversion") | |
all_images = [] | |
skipped_pdfs = [] | |
total_pages = sum(len(fitz.open(pdf.name)) for pdf in pdf_files) | |
processed_pages = 0 | |
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
future_to_pdf = { | |
executor.submit(process_pdf, pdf, sample_percentage, temp_dir): pdf | |
for pdf in pdf_files | |
} | |
for future in as_completed(future_to_pdf): | |
pdf = future_to_pdf[future] | |
images, error, pages_processed = future.result() | |
if error: | |
skipped_pdfs.append(error) | |
gr.Info(error) | |
else: | |
all_images.extend(images) | |
processed_pages += pages_processed | |
progress((processed_pages / total_pages), desc=f"Processing {pdf.name}") | |
message = f"Saved {len(all_images)} images to temporary directory" | |
if skipped_pdfs: | |
message += f"\nSkipped {len(skipped_pdfs)} PDFs due to errors: {', '.join(skipped_pdfs)}" | |
return all_images, message | |
def get_size_category(num_images): | |
if num_images < 1000: | |
return "n<1K" | |
elif num_images < 10000: | |
return "1K<n<10K" | |
elif num_images < 100000: | |
return "10K<n<100K" | |
elif num_images < 1000000: | |
return "100K<n<1M" | |
else: | |
return "n>1M" | |
def process_pdfs( | |
pdf_files, | |
sample_percentage, | |
hf_repo, | |
create_zip, | |
private_repo, | |
oauth_token: gr.OAuthToken | None, | |
progress=gr.Progress(), | |
): | |
if not pdf_files: | |
return ( | |
None, | |
None, | |
gr.Markdown( | |
"⚠️ No PDF files uploaded. Please upload at least one PDF file." | |
), | |
) | |
if oauth_token is None: | |
return ( | |
None, | |
None, | |
gr.Markdown( | |
"⚠️ Not logged in to Hugging Face. Please log in to upload to a Hugging Face dataset." | |
), | |
) | |
try: | |
temp_dir = tempfile.mkdtemp() | |
images_dir = os.path.join(temp_dir, "images") | |
os.makedirs(images_dir) | |
progress(0, desc="Starting PDF processing") | |
images, message = pdf_to_images(pdf_files, sample_percentage, images_dir) | |
# Create a new directory for sampled images | |
sampled_images_dir = os.path.join(temp_dir, "sampled_images") | |
os.makedirs(sampled_images_dir) | |
# Move sampled images to the new directory and update paths | |
updated_images = [] | |
for image in images: | |
new_path = os.path.join(sampled_images_dir, os.path.basename(image)) | |
shutil.move(image, new_path) | |
updated_images.append(new_path) | |
# Update the images list with new paths | |
images = updated_images | |
zip_path = None | |
if create_zip: | |
# Create a zip file of the sampled images | |
zip_path = os.path.join(temp_dir, "converted_images.zip") | |
with zipfile.ZipFile(zip_path, "w") as zipf: | |
progress(0, desc="Zipping images") | |
for image in progress.tqdm(images, desc="Zipping images"): | |
zipf.write( | |
os.path.join(sampled_images_dir, os.path.basename(image)), | |
os.path.basename(image), | |
) | |
message += f"\nCreated zip file with {len(images)} images" | |
if hf_repo: | |
try: | |
hf_api = HfApi(token=oauth_token.token) | |
hf_api.create_repo( | |
hf_repo, | |
repo_type="dataset", | |
private=private_repo, | |
) | |
# Upload only the sampled images directory | |
hf_api.upload_folder( | |
folder_path=sampled_images_dir, | |
repo_id=hf_repo, | |
repo_type="dataset", | |
path_in_repo="images", | |
) | |
# Determine size category | |
size_category = get_size_category(len(images)) | |
# Create DatasetCardData instance | |
card_data = DatasetCardData( | |
tags=["created-with-pdfs-to-page-images-converter", "pdf-to-image"], | |
size_categories=[size_category], | |
) | |
# Create and populate the dataset card | |
card = DatasetCard.from_template( | |
card_data, | |
template_path=None, # Use default template | |
hf_repo=hf_repo, | |
num_images=len(images), | |
num_pdfs=len(pdf_files), | |
sample_size=sample_percentage | |
if sample_percentage > 0 | |
else "All pages", | |
creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
) | |
# Add our custom content to the card | |
card.text = DATASET_CARD_TEMPLATE.format( | |
hf_repo=hf_repo, | |
num_images=len(images), | |
num_pdfs=len(pdf_files), | |
sample_size=sample_percentage | |
if sample_percentage > 0 | |
else "All pages", | |
creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
size_category=size_category, | |
) | |
repo_url = f"https://huggingface.co/datasets/{hf_repo}" | |
message += f"\nUploaded dataset card to Hugging Face repo: [{hf_repo}]({repo_url})" | |
card.push_to_hub(hf_repo, token=oauth_token.token) | |
except Exception as e: | |
message += f"\nFailed to upload to Hugging Face: {str(e)}" | |
return images, zip_path, message | |
except Exception as e: | |
if "temp_dir" in locals(): | |
shutil.rmtree(temp_dir) | |
return None, None, f"An error occurred: {str(e)}" | |
# Define the Gradio interface | |
with gr.Blocks() as demo: | |
gr.HTML( | |
"""<h1 style='text-align: center;'> PDFs to Page Images Converter</h1> | |
<center><i> 📁 Convert PDFs to an image dataset, splitting pages into individual images 📁 </i></center>""" | |
) | |
gr.HTML( | |
""" | |
<div style="display: flex; justify-content: center; align-items: center; max-width: 1000px; margin: 0 auto;"> | |
<div style="flex: 1; padding-right: 20px;"> | |
<p>This app allows you to:</p> | |
<ol> | |
<li>Upload one or more PDF files</li> | |
<li>Convert each page of the PDFs into separate image files</li> | |
<li>(Optionally) sample a specific number of pages from each PDF</li> | |
<li>(Optionally) Create a downloadable ZIP file of the converted images</li> | |
<li>(Optionally) Upload the images to a Hugging Face dataset repository</li> | |
</ol> | |
</div> | |
<div style="flex: 1;"> | |
<img src="https://huggingface.co/spaces/Dataset-Creation-Tools/pdf-to-page-images-dataset/resolve/main/assets/PDF%20page%20split%20illustration.png" | |
alt="PDF page split illustration" | |
style="max-width: 50%; height: auto;"> | |
</div> | |
</div> | |
""" | |
) | |
with gr.Row(): | |
pdf_files = gr.File( | |
file_count="multiple", label="Upload PDF(s)", file_types=["*.pdf"] | |
) | |
with gr.Row(): | |
sample_percentage = gr.Slider( | |
minimum=0, | |
maximum=100, | |
value=100, | |
step=1, | |
label="Percentage of pages to sample per PDF", | |
info="0% for no sampling (all pages), 100% for all pages", | |
) | |
create_zip = gr.Checkbox(label="Create ZIP file of images?", value=False) | |
with gr.Accordion("Hugging Face Upload Options", open=True): | |
gr.LoginButton(size="sm") | |
with gr.Row(): | |
hf_repo = gr.Textbox( | |
label="Hugging Face Repo", | |
placeholder="username/repo-name", | |
info="Enter the Hugging Face repository name in the format 'username/repo-name'", | |
) | |
private_repo = gr.Checkbox(label="Make repository private?", value=False) | |
with gr.Accordion("View converted images", open=False): | |
output_gallery = gr.Gallery(label="Converted Images") | |
status_text = gr.Markdown(label="Status") | |
download_button = gr.File(label="Download Converted Images") | |
submit_button = gr.Button("Convert PDFs to page images") | |
submit_button.click( | |
process_pdfs, | |
inputs=[pdf_files, sample_percentage, hf_repo, create_zip, private_repo], | |
outputs=[output_gallery, download_button, status_text], | |
) | |
demo.launch() | |