|
from typing import Dict |
|
from enum import Enum |
|
|
|
from pdfminer.pdfinterp import PDFGraphicState, PDFResourceManager |
|
from pdfminer.pdffont import PDFCIDFont |
|
from pdfminer.converter import PDFConverter |
|
from pdfminer.pdffont import PDFUnicodeNotDefined |
|
from pdfminer.utils import apply_matrix_pt, mult_matrix |
|
from pdfminer.layout import ( |
|
LTChar, |
|
LTFigure, |
|
LTLine, |
|
LTPage, |
|
) |
|
import logging |
|
import re |
|
import concurrent.futures |
|
import numpy as np |
|
import unicodedata |
|
from string import Template |
|
from tenacity import retry, wait_fixed |
|
from pdf2zh.translator import ( |
|
AzureOpenAITranslator, |
|
BaseTranslator, |
|
GoogleTranslator, |
|
BingTranslator, |
|
DeepLTranslator, |
|
DeepLXTranslator, |
|
OllamaTranslator, |
|
OpenAITranslator, |
|
ZhipuTranslator, |
|
ModelScopeTranslator, |
|
SiliconTranslator, |
|
GeminiTranslator, |
|
AzureTranslator, |
|
TencentTranslator, |
|
DifyTranslator, |
|
AnythingLLMTranslator, |
|
XinferenceTranslator, |
|
ArgosTranslator, |
|
GorkTranslator, |
|
GroqTranslator, |
|
DeepseekTranslator, |
|
OpenAIlikedTranslator, |
|
) |
|
from pymupdf import Font |
|
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
class PDFConverterEx(PDFConverter): |
|
def __init__( |
|
self, |
|
rsrcmgr: PDFResourceManager, |
|
) -> None: |
|
PDFConverter.__init__(self, rsrcmgr, None, "utf-8", 1, None) |
|
|
|
def begin_page(self, page, ctm) -> None: |
|
|
|
(x0, y0, x1, y1) = page.cropbox |
|
(x0, y0) = apply_matrix_pt(ctm, (x0, y0)) |
|
(x1, y1) = apply_matrix_pt(ctm, (x1, y1)) |
|
mediabox = (0, 0, abs(x0 - x1), abs(y0 - y1)) |
|
self.cur_item = LTPage(page.pageno, mediabox) |
|
|
|
def end_page(self, page): |
|
|
|
return self.receive_layout(self.cur_item) |
|
|
|
def begin_figure(self, name, bbox, matrix) -> None: |
|
|
|
self._stack.append(self.cur_item) |
|
self.cur_item = LTFigure(name, bbox, mult_matrix(matrix, self.ctm)) |
|
self.cur_item.pageid = self._stack[-1].pageid |
|
|
|
def end_figure(self, _: str) -> None: |
|
|
|
fig = self.cur_item |
|
assert isinstance(self.cur_item, LTFigure), str(type(self.cur_item)) |
|
self.cur_item = self._stack.pop() |
|
self.cur_item.add(fig) |
|
return self.receive_layout(fig) |
|
|
|
def render_char( |
|
self, |
|
matrix, |
|
font, |
|
fontsize: float, |
|
scaling: float, |
|
rise: float, |
|
cid: int, |
|
ncs, |
|
graphicstate: PDFGraphicState, |
|
) -> float: |
|
|
|
try: |
|
text = font.to_unichr(cid) |
|
assert isinstance(text, str), str(type(text)) |
|
except PDFUnicodeNotDefined: |
|
text = self.handle_undefined_char(font, cid) |
|
textwidth = font.char_width(cid) |
|
textdisp = font.char_disp(cid) |
|
item = LTChar( |
|
matrix, |
|
font, |
|
fontsize, |
|
scaling, |
|
rise, |
|
text, |
|
textwidth, |
|
textdisp, |
|
ncs, |
|
graphicstate, |
|
) |
|
self.cur_item.add(item) |
|
item.cid = cid |
|
item.font = font |
|
return item.adv |
|
|
|
|
|
class Paragraph: |
|
def __init__(self, y, x, x0, x1, y0, y1, size, brk): |
|
self.y: float = y |
|
self.x: float = x |
|
self.x0: float = x0 |
|
self.x1: float = x1 |
|
self.y0: float = y0 |
|
self.y1: float = y1 |
|
self.size: float = size |
|
self.brk: bool = brk |
|
|
|
|
|
|
|
class TranslateConverter(PDFConverterEx): |
|
def __init__( |
|
self, |
|
rsrcmgr, |
|
vfont: str = None, |
|
vchar: str = None, |
|
thread: int = 0, |
|
layout={}, |
|
lang_in: str = "", |
|
lang_out: str = "", |
|
service: str = "", |
|
noto_name: str = "", |
|
noto: Font = None, |
|
envs: Dict = None, |
|
prompt: Template = None, |
|
) -> None: |
|
super().__init__(rsrcmgr) |
|
self.vfont = vfont |
|
self.vchar = vchar |
|
self.thread = thread |
|
self.layout = layout |
|
self.noto_name = noto_name |
|
self.noto = noto |
|
self.translator: BaseTranslator = None |
|
param = service.split(":", 1) |
|
service_name = param[0] |
|
service_model = param[1] if len(param) > 1 else None |
|
if not envs: |
|
envs = {} |
|
for translator in [GoogleTranslator, BingTranslator, DeepLTranslator, DeepLXTranslator, OllamaTranslator, XinferenceTranslator, AzureOpenAITranslator, |
|
OpenAITranslator, ZhipuTranslator, ModelScopeTranslator, SiliconTranslator, GeminiTranslator, AzureTranslator, TencentTranslator, DifyTranslator, AnythingLLMTranslator, ArgosTranslator, GorkTranslator, GroqTranslator, DeepseekTranslator, OpenAIlikedTranslator,]: |
|
if service_name == translator.name: |
|
self.translator = translator(lang_in, lang_out, service_model, envs=envs, prompt=prompt) |
|
if not self.translator: |
|
raise ValueError("Unsupported translation service") |
|
|
|
def receive_layout(self, ltpage: LTPage): |
|
|
|
sstk: list[str] = [] |
|
pstk: list[Paragraph] = [] |
|
vbkt: int = 0 |
|
|
|
vstk: list[LTChar] = [] |
|
vlstk: list[LTLine] = [] |
|
vfix: float = 0 |
|
|
|
var: list[list[LTChar]] = [] |
|
varl: list[list[LTLine]] = [] |
|
varf: list[float] = [] |
|
vlen: list[float] = [] |
|
|
|
lstk: list[LTLine] = [] |
|
xt: LTChar = None |
|
xt_cls: int = -1 |
|
vmax: float = ltpage.width / 4 |
|
ops: str = "" |
|
|
|
def vflag(font: str, char: str): |
|
if isinstance(font, bytes): |
|
try: |
|
font = font.decode('utf-8') |
|
except UnicodeDecodeError: |
|
font = "" |
|
font = font.split("+")[-1] |
|
if re.match(r"\(cid:", char): |
|
return True |
|
|
|
if self.vfont: |
|
if re.match(self.vfont, font): |
|
return True |
|
else: |
|
if re.match( |
|
r"(CM[^R]|MS.M|XY|MT|BL|RM|EU|LA|RS|LINE|LCIRCLE|TeX-|rsfs|txsy|wasy|stmary|.*Mono|.*Code|.*Ital|.*Sym|.*Math)", |
|
font, |
|
): |
|
return True |
|
|
|
if self.vchar: |
|
if re.match(self.vchar, char): |
|
return True |
|
else: |
|
if ( |
|
char |
|
and char != " " |
|
and ( |
|
unicodedata.category(char[0]) |
|
in ["Lm", "Mn", "Sk", "Sm", "Zl", "Zp", "Zs"] |
|
or ord(char[0]) in range(0x370, 0x400) |
|
) |
|
): |
|
return True |
|
return False |
|
|
|
|
|
|
|
for child in ltpage: |
|
if isinstance(child, LTChar): |
|
cur_v = False |
|
layout = self.layout[ltpage.pageid] |
|
|
|
h, w = layout.shape |
|
|
|
cx, cy = np.clip(int(child.x0), 0, w - 1), np.clip(int(child.y0), 0, h - 1) |
|
cls = layout[cy, cx] |
|
|
|
if child.get_text() == "•": |
|
cls = 0 |
|
|
|
if ( |
|
cls == 0 |
|
or (cls == xt_cls and len(sstk[-1].strip()) > 1 and child.size < pstk[-1].size * 0.79) |
|
or vflag(child.fontname, child.get_text()) |
|
or (child.matrix[0] == 0 and child.matrix[3] == 0) |
|
): |
|
cur_v = True |
|
|
|
if not cur_v: |
|
if vstk and child.get_text() == "(": |
|
cur_v = True |
|
vbkt += 1 |
|
if vbkt and child.get_text() == ")": |
|
cur_v = True |
|
vbkt -= 1 |
|
if ( |
|
not cur_v |
|
or cls != xt_cls |
|
|
|
|
|
|
|
|
|
or (sstk[-1] != "" and abs(child.x0 - xt.x0) > vmax) |
|
): |
|
if vstk: |
|
if ( |
|
not cur_v |
|
and cls == xt_cls |
|
and child.x0 > max([vch.x0 for vch in vstk]) |
|
): |
|
vfix = vstk[0].y0 - child.y0 |
|
if sstk[-1] == "": |
|
xt_cls = -1 |
|
sstk[-1] += f"{{v{len(var)}}}" |
|
var.append(vstk) |
|
varl.append(vlstk) |
|
varf.append(vfix) |
|
vstk = [] |
|
vlstk = [] |
|
vfix = 0 |
|
|
|
if not vstk: |
|
if cls == xt_cls: |
|
if child.x0 > xt.x1 + 1: |
|
sstk[-1] += " " |
|
elif child.x1 < xt.x0: |
|
sstk[-1] += " " |
|
pstk[-1].brk = True |
|
else: |
|
sstk.append("") |
|
pstk.append(Paragraph(child.y0, child.x0, child.x0, child.x0, child.y0, child.y1, child.size, False)) |
|
if not cur_v: |
|
if ( |
|
child.size > pstk[-1].size |
|
or len(sstk[-1].strip()) == 1 |
|
) and child.get_text() != " ": |
|
pstk[-1].y -= child.size - pstk[-1].size |
|
pstk[-1].size = child.size |
|
sstk[-1] += child.get_text() |
|
else: |
|
if ( |
|
not vstk |
|
and cls == xt_cls |
|
and child.x0 > xt.x0 |
|
): |
|
vfix = child.y0 - xt.y0 |
|
vstk.append(child) |
|
|
|
pstk[-1].x0 = min(pstk[-1].x0, child.x0) |
|
pstk[-1].x1 = max(pstk[-1].x1, child.x1) |
|
pstk[-1].y0 = min(pstk[-1].y0, child.y0) |
|
pstk[-1].y1 = max(pstk[-1].y1, child.y1) |
|
|
|
xt = child |
|
xt_cls = cls |
|
elif isinstance(child, LTFigure): |
|
pass |
|
elif isinstance(child, LTLine): |
|
layout = self.layout[ltpage.pageid] |
|
|
|
h, w = layout.shape |
|
|
|
cx, cy = np.clip(int(child.x0), 0, w - 1), np.clip(int(child.y0), 0, h - 1) |
|
cls = layout[cy, cx] |
|
if vstk and cls == xt_cls: |
|
vlstk.append(child) |
|
else: |
|
lstk.append(child) |
|
else: |
|
pass |
|
|
|
if vstk: |
|
sstk[-1] += f"{{v{len(var)}}}" |
|
var.append(vstk) |
|
varl.append(vlstk) |
|
varf.append(vfix) |
|
log.debug("\n==========[VSTACK]==========\n") |
|
for id, v in enumerate(var): |
|
l = max([vch.x1 for vch in v]) - v[0].x0 |
|
log.debug(f'< {l:.1f} {v[0].x0:.1f} {v[0].y0:.1f} {v[0].cid} {v[0].fontname} {len(varl[id])} > v{id} = {"".join([ch.get_text() for ch in v])}') |
|
vlen.append(l) |
|
|
|
|
|
|
|
log.debug("\n==========[SSTACK]==========\n") |
|
|
|
@retry(wait=wait_fixed(1)) |
|
def worker(s: str): |
|
if not s.strip() or re.match(r"^\{v\d+\}$", s): |
|
return s |
|
try: |
|
new = self.translator.translate(s) |
|
return new |
|
except BaseException as e: |
|
if log.isEnabledFor(logging.DEBUG): |
|
log.exception(e) |
|
else: |
|
log.exception(e, exc_info=False) |
|
raise e |
|
with concurrent.futures.ThreadPoolExecutor( |
|
max_workers=self.thread |
|
) as executor: |
|
news = list(executor.map(worker, sstk)) |
|
|
|
|
|
|
|
def raw_string(fcur: str, cstk: str): |
|
if fcur == self.noto_name: |
|
return "".join(["%04x" % self.noto.has_glyph(ord(c)) for c in cstk]) |
|
elif isinstance(self.fontmap[fcur], PDFCIDFont): |
|
return "".join(["%04x" % ord(c) for c in cstk]) |
|
else: |
|
return "".join(["%02x" % ord(c) for c in cstk]) |
|
|
|
|
|
LANG_LINEHEIGHT_MAP = { |
|
"zh-cn": 1.4, "zh-tw": 1.4, "zh-hans": 1.4, "zh-hant": 1.4, "zh": 1.4, |
|
"ja": 1.1, "ko": 1.2, "en": 1.2, "ar": 1.0, "ru": 0.8, "uk": 0.8, "ta": 0.8 |
|
} |
|
default_line_height = LANG_LINEHEIGHT_MAP.get(self.translator.lang_out.lower(), 1.1) |
|
_x, _y = 0, 0 |
|
ops_list = [] |
|
|
|
def gen_op_txt(font, size, x, y, rtxt): |
|
return f"/{font} {size:f} Tf 1 0 0 1 {x:f} {y:f} Tm [<{rtxt}>] TJ " |
|
|
|
def gen_op_line(x, y, xlen, ylen, linewidth): |
|
return f"ET q 1 0 0 1 {x:f} {y:f} cm [] 0 d 0 J {linewidth:f} w 0 0 m {xlen:f} {ylen:f} l S Q BT " |
|
|
|
for id, new in enumerate(news): |
|
x: float = pstk[id].x |
|
y: float = pstk[id].y |
|
x0: float = pstk[id].x0 |
|
x1: float = pstk[id].x1 |
|
height: float = pstk[id].y1 - pstk[id].y0 |
|
size: float = pstk[id].size |
|
brk: bool = pstk[id].brk |
|
cstk: str = "" |
|
fcur: str = None |
|
lidx = 0 |
|
tx = x |
|
fcur_ = fcur |
|
ptr = 0 |
|
log.debug(f"< {y} {x} {x0} {x1} {size} {brk} > {sstk[id]} | {new}") |
|
|
|
ops_vals: list[dict] = [] |
|
|
|
while ptr < len(new): |
|
vy_regex = re.match( |
|
r"\{\s*v([\d\s]+)\}", new[ptr:], re.IGNORECASE |
|
) |
|
mod = 0 |
|
if vy_regex: |
|
ptr += len(vy_regex.group(0)) |
|
try: |
|
vid = int(vy_regex.group(1).replace(" ", "")) |
|
adv = vlen[vid] |
|
except Exception: |
|
continue |
|
if var[vid][-1].get_text() and unicodedata.category(var[vid][-1].get_text()[0]) in ["Lm", "Mn", "Sk"]: |
|
mod = var[vid][-1].width |
|
else: |
|
ch = new[ptr] |
|
fcur_ = None |
|
try: |
|
if fcur_ is None and self.fontmap["tiro"].to_unichr(ord(ch)) == ch: |
|
fcur_ = "tiro" |
|
except Exception: |
|
pass |
|
if fcur_ is None: |
|
fcur_ = self.noto_name |
|
if fcur_ == self.noto_name: |
|
adv = self.noto.char_lengths(ch, size)[0] |
|
else: |
|
adv = self.fontmap[fcur_].char_width(ord(ch)) * size |
|
ptr += 1 |
|
if ( |
|
fcur_ != fcur |
|
or vy_regex |
|
or x + adv > x1 + 0.1 * size |
|
): |
|
if cstk: |
|
ops_vals.append({ |
|
"type": OpType.TEXT, |
|
"font": fcur, |
|
"size": size, |
|
"x": tx, |
|
"dy": 0, |
|
"rtxt": raw_string(fcur, cstk), |
|
"lidx": lidx |
|
}) |
|
cstk = "" |
|
if brk and x + adv > x1 + 0.1 * size: |
|
x = x0 |
|
lidx += 1 |
|
if vy_regex: |
|
fix = 0 |
|
if fcur is not None: |
|
fix = varf[vid] |
|
for vch in var[vid]: |
|
vc = chr(vch.cid) |
|
ops_vals.append({ |
|
"type": OpType.TEXT, |
|
"font": self.fontid[vch.font], |
|
"size": vch.size, |
|
"x": x + vch.x0 - var[vid][0].x0, |
|
"dy": fix + vch.y0 - var[vid][0].y0, |
|
"rtxt": raw_string(self.fontid[vch.font], vc), |
|
"lidx": lidx |
|
}) |
|
if log.isEnabledFor(logging.DEBUG): |
|
lstk.append(LTLine(0.1, (_x, _y), (x + vch.x0 - var[vid][0].x0, fix + y + vch.y0 - var[vid][0].y0))) |
|
_x, _y = x + vch.x0 - var[vid][0].x0, fix + y + vch.y0 - var[vid][0].y0 |
|
for l in varl[vid]: |
|
if l.linewidth < 5: |
|
ops_vals.append({ |
|
"type": OpType.LINE, |
|
"x": l.pts[0][0] + x - var[vid][0].x0, |
|
"dy": l.pts[0][1] + fix - var[vid][0].y0, |
|
"linewidth": l.linewidth, |
|
"xlen": l.pts[1][0] - l.pts[0][0], |
|
"ylen": l.pts[1][1] - l.pts[0][1], |
|
"lidx": lidx |
|
}) |
|
else: |
|
if not cstk: |
|
tx = x |
|
if x == x0 and ch == " ": |
|
adv = 0 |
|
else: |
|
cstk += ch |
|
else: |
|
cstk += ch |
|
adv -= mod |
|
fcur = fcur_ |
|
x += adv |
|
if log.isEnabledFor(logging.DEBUG): |
|
lstk.append(LTLine(0.1, (_x, _y), (x, y))) |
|
_x, _y = x, y |
|
|
|
if cstk: |
|
ops_vals.append({ |
|
"type": OpType.TEXT, |
|
"font": fcur, |
|
"size": size, |
|
"x": tx, |
|
"dy": 0, |
|
"rtxt": raw_string(fcur, cstk), |
|
"lidx": lidx |
|
}) |
|
|
|
line_height = default_line_height |
|
|
|
while (lidx + 1) * size * line_height > height and line_height >= 1: |
|
line_height -= 0.05 |
|
|
|
for vals in ops_vals: |
|
if vals["type"] == OpType.TEXT: |
|
ops_list.append(gen_op_txt(vals["font"], vals["size"], vals["x"], vals["dy"] + y - vals["lidx"] * size * line_height, vals["rtxt"])) |
|
elif vals["type"] == OpType.LINE: |
|
ops_list.append(gen_op_line(vals["x"], vals["dy"] + y - vals["lidx"] * size * line_height, vals["xlen"], vals["ylen"], vals["linewidth"])) |
|
|
|
for l in lstk: |
|
if l.linewidth < 5: |
|
ops_list.append(gen_op_line(l.pts[0][0], l.pts[0][1], l.pts[1][0] - l.pts[0][0], l.pts[1][1] - l.pts[0][1], l.linewidth)) |
|
|
|
ops = f"BT {''.join(ops_list)}ET " |
|
return ops |
|
|
|
|
|
class OpType(Enum): |
|
TEXT = "text" |
|
LINE = "line" |
|
|