pz / pdf2zh /high_level.py
github-actions[bot]
GitHub deploy: 430ada5a033af12b22377dc4dedd36c9b82e0183
35bba99
"""Functions that can be used for the most common use-cases for pdf2zh.six"""
import asyncio
import io
import os
import sys
import tempfile
import urllib.request
from asyncio import CancelledError
from pathlib import Path
from string import Template
from typing import Any, BinaryIO, List, Optional, Dict
import numpy as np
import requests
import tqdm
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfexceptions import PDFValueError
from pdfminer.pdfinterp import PDFResourceManager
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfparser import PDFParser
from pymupdf import Document, Font
from pdf2zh.converter import TranslateConverter
from pdf2zh.doclayout import OnnxModel
from pdf2zh.pdfinterp import PDFPageInterpreterEx
from pdf2zh.config import ConfigManager
NOTO_NAME = "noto"
noto_list = [
"am", # Amharic
"ar", # Arabic
"bn", # Bengali
"bg", # Bulgarian
"chr", # Cherokee
"el", # Greek
"gu", # Gujarati
"iw", # Hebrew
"hi", # Hindi
"kn", # Kannada
"ml", # Malayalam
"mr", # Marathi
"ru", # Russian
"sr", # Serbian
"ta", # Tamil
"te", # Telugu
"th", # Thai
"ur", # Urdu
"uk", # Ukrainian
]
def check_files(files: List[str]) -> List[str]:
files = [
f for f in files if not f.startswith("http://")
] # exclude online files, http
files = [
f for f in files if not f.startswith("https://")
] # exclude online files, https
missing_files = [file for file in files if not os.path.exists(file)]
return missing_files
def translate_patch(
inf: BinaryIO,
pages: Optional[list[int]] = None,
vfont: str = "",
vchar: str = "",
thread: int = 0,
doc_zh: Document = None,
lang_in: str = "",
lang_out: str = "",
service: str = "",
noto_name: str = "",
noto: Font = None,
callback: object = None,
cancellation_event: asyncio.Event = None,
model: OnnxModel = None,
envs: Dict = None,
prompt: Template = None,
**kwarg: Any,
) -> None:
rsrcmgr = PDFResourceManager()
layout = {}
device = TranslateConverter(
rsrcmgr,
vfont,
vchar,
thread,
layout,
lang_in,
lang_out,
service,
noto_name,
noto,
envs,
prompt,
)
assert device is not None
obj_patch = {}
interpreter = PDFPageInterpreterEx(rsrcmgr, device, obj_patch)
if pages:
total_pages = len(pages)
else:
total_pages = doc_zh.page_count
parser = PDFParser(inf)
doc = PDFDocument(parser)
with tqdm.tqdm(total=total_pages) as progress:
for pageno, page in enumerate(PDFPage.create_pages(doc)):
if cancellation_event and cancellation_event.is_set():
raise CancelledError("task cancelled")
if pages and (pageno not in pages):
continue
progress.update()
if callback:
callback(progress)
page.pageno = pageno
pix = doc_zh[page.pageno].get_pixmap()
image = np.fromstring(pix.samples, np.uint8).reshape(
pix.height, pix.width, 3
)[:, :, ::-1]
page_layout = model.predict(image, imgsz=int(pix.height / 32) * 32)[0]
# kdtree 是不可能 kdtree 的,不如直接渲染成图片,用空间换时间
box = np.ones((pix.height, pix.width))
h, w = box.shape
vcls = ["abandon", "figure", "table", "isolate_formula", "formula_caption"]
for i, d in enumerate(page_layout.boxes):
if page_layout.names[int(d.cls)] not in vcls:
x0, y0, x1, y1 = d.xyxy.squeeze()
x0, y0, x1, y1 = (
np.clip(int(x0 - 1), 0, w - 1),
np.clip(int(h - y1 - 1), 0, h - 1),
np.clip(int(x1 + 1), 0, w - 1),
np.clip(int(h - y0 + 1), 0, h - 1),
)
box[y0:y1, x0:x1] = i + 2
for i, d in enumerate(page_layout.boxes):
if page_layout.names[int(d.cls)] in vcls:
x0, y0, x1, y1 = d.xyxy.squeeze()
x0, y0, x1, y1 = (
np.clip(int(x0 - 1), 0, w - 1),
np.clip(int(h - y1 - 1), 0, h - 1),
np.clip(int(x1 + 1), 0, w - 1),
np.clip(int(h - y0 + 1), 0, h - 1),
)
box[y0:y1, x0:x1] = 0
layout[page.pageno] = box
# 新建一个 xref 存放新指令流
page.page_xref = doc_zh.get_new_xref() # hack 插入页面的新 xref
doc_zh.update_object(page.page_xref, "<<>>")
doc_zh.update_stream(page.page_xref, b"")
doc_zh[page.pageno].set_contents(page.page_xref)
interpreter.process_page(page)
device.close()
return obj_patch
def translate_stream(
stream: bytes,
pages: Optional[list[int]] = None,
lang_in: str = "",
lang_out: str = "",
service: str = "",
thread: int = 0,
vfont: str = "",
vchar: str = "",
callback: object = None,
cancellation_event: asyncio.Event = None,
model: OnnxModel = None,
envs: Dict = None,
prompt: Template = None,
**kwarg: Any,
):
font_list = [("tiro", None)]
font_path = download_remote_fonts(lang_out.lower())
noto_name = NOTO_NAME
noto = Font(noto_name, font_path)
font_list.append((noto_name, font_path))
doc_en = Document(stream=stream)
stream = io.BytesIO()
doc_en.save(stream)
doc_zh = Document(stream=stream)
page_count = doc_zh.page_count
# font_list = [("GoNotoKurrent-Regular.ttf", font_path), ("tiro", None)]
font_id = {}
for page in doc_zh:
for font in font_list:
font_id[font[0]] = page.insert_font(font[0], font[1])
xreflen = doc_zh.xref_length()
for xref in range(1, xreflen):
for label in ["Resources/", ""]: # 可能是基于 xobj 的 res
try: # xref 读写可能出错
font_res = doc_zh.xref_get_key(xref, f"{label}Font")
if font_res[0] == "dict":
for font in font_list:
font_exist = doc_zh.xref_get_key(xref, f"{label}Font/{font[0]}")
if font_exist[0] == "null":
doc_zh.xref_set_key(
xref,
f"{label}Font/{font[0]}",
f"{font_id[font[0]]} 0 R",
)
except Exception:
pass
fp = io.BytesIO()
doc_zh.save(fp)
obj_patch: dict = translate_patch(fp, **locals())
for obj_id, ops_new in obj_patch.items():
# ops_old=doc_en.xref_stream(obj_id)
# print(obj_id)
# print(ops_old)
# print(ops_new.encode())
doc_zh.update_stream(obj_id, ops_new.encode())
doc_en.insert_file(doc_zh)
for id in range(page_count):
doc_en.move_page(page_count + id, id * 2 + 1)
doc_zh.subset_fonts(fallback=True)
doc_en.subset_fonts(fallback=True)
return (
doc_zh.write(deflate=True, garbage=3, use_objstms=1),
doc_en.write(deflate=True, garbage=3, use_objstms=1),
)
def convert_to_pdfa(input_path, output_path):
"""
Convert PDF to PDF/A format
Args:
input_path: Path to source PDF file
output_path: Path to save PDF/A file
"""
from pikepdf import Dictionary, Name, Pdf
# Open the PDF file
pdf = Pdf.open(input_path)
# Add PDF/A conformance metadata
metadata = {
"pdfa_part": "2",
"pdfa_conformance": "B",
"title": pdf.docinfo.get("/Title", ""),
"author": pdf.docinfo.get("/Author", ""),
"creator": "PDF Math Translate",
}
with pdf.open_metadata() as meta:
meta.load_from_docinfo(pdf.docinfo)
meta["pdfaid:part"] = metadata["pdfa_part"]
meta["pdfaid:conformance"] = metadata["pdfa_conformance"]
# Create OutputIntent dictionary
output_intent = Dictionary(
{
"/Type": Name("/OutputIntent"),
"/S": Name("/GTS_PDFA1"),
"/OutputConditionIdentifier": "sRGB IEC61966-2.1",
"/RegistryName": "http://www.color.org",
"/Info": "sRGB IEC61966-2.1",
}
)
# Add output intent to PDF root
if "/OutputIntents" not in pdf.Root:
pdf.Root.OutputIntents = [output_intent]
else:
pdf.Root.OutputIntents.append(output_intent)
# Save as PDF/A
pdf.save(output_path, linearize=True)
pdf.close()
def translate(
files: list[str],
output: str = "",
pages: Optional[list[int]] = None,
lang_in: str = "",
lang_out: str = "",
service: str = "",
thread: int = 0,
vfont: str = "",
vchar: str = "",
callback: object = None,
compatible: bool = False,
cancellation_event: asyncio.Event = None,
model: OnnxModel = None,
envs: Dict = None,
prompt: Template = None,
**kwarg: Any,
):
if not files:
raise PDFValueError("No files to process.")
missing_files = check_files(files)
if missing_files:
print("The following files do not exist:", file=sys.stderr)
for file in missing_files:
print(f" {file}", file=sys.stderr)
raise PDFValueError("Some files do not exist.")
result_files = []
for file in files:
if type(file) is str and (
file.startswith("http://") or file.startswith("https://")
):
print("Online files detected, downloading...")
try:
r = requests.get(file, allow_redirects=True)
if r.status_code == 200:
with tempfile.NamedTemporaryFile(
suffix=".pdf", delete=False
) as tmp_file:
print(f"Writing the file: {file}...")
tmp_file.write(r.content)
file = tmp_file.name
else:
r.raise_for_status()
except Exception as e:
raise PDFValueError(
f"Errors occur in downloading the PDF file. Please check the link(s).\nError:\n{e}"
)
filename = os.path.splitext(os.path.basename(file))[0]
# If the commandline has specified converting to PDF/A format
# --compatible / -cp
if compatible:
with tempfile.NamedTemporaryFile(
suffix="-pdfa.pdf", delete=False
) as tmp_pdfa:
print(f"Converting {file} to PDF/A format...")
convert_to_pdfa(file, tmp_pdfa.name)
doc_raw = open(tmp_pdfa.name, "rb")
os.unlink(tmp_pdfa.name)
else:
doc_raw = open(file, "rb")
s_raw = doc_raw.read()
doc_raw.close()
if file.startswith(tempfile.gettempdir()):
os.unlink(file)
s_mono, s_dual = translate_stream(
s_raw,
**locals(),
)
file_mono = Path(output) / f"{filename}-mono.pdf"
file_dual = Path(output) / f"{filename}-dual.pdf"
doc_mono = open(file_mono, "wb")
doc_dual = open(file_dual, "wb")
doc_mono.write(s_mono)
doc_dual.write(s_dual)
doc_mono.close()
doc_dual.close()
result_files.append((str(file_mono), str(file_dual)))
return result_files
def download_remote_fonts(lang: str):
URL_PREFIX = "https://github.com/timelic/source-han-serif/releases/download/main/"
LANG_NAME_MAP = {
**{la: "GoNotoKurrent-Regular.ttf" for la in noto_list},
**{
la: f"SourceHanSerif{region}-Regular.ttf"
for region, langs in {
"CN": ["zh-cn", "zh-hans", "zh"],
"TW": ["zh-tw", "zh-hant"],
"JP": ["ja"],
"KR": ["ko"],
}.items()
for la in langs
},
}
font_name = LANG_NAME_MAP.get(lang, "GoNotoKurrent-Regular.ttf")
# docker
font_path = ConfigManager.get("NOTO_FONT_PATH", Path("/app", font_name).as_posix())
if not Path(font_path).exists():
font_path = Path(tempfile.gettempdir(), font_name).as_posix()
if not Path(font_path).exists():
print(f"Downloading {font_name}...")
urllib.request.urlretrieve(f"{URL_PREFIX}{font_name}", font_path)
return font_path