Spaces:
Runtime error
Runtime error
| import re | |
| from collections import OrderedDict, defaultdict | |
| import pymupdf | |
| def sort_text(chunks): | |
| x_threshold = 300 | |
| left_column = [] | |
| right_column = [] | |
| for chunk in chunks: | |
| if chunk["coordinates"][0] < x_threshold: | |
| left_column.append(chunk) | |
| else: | |
| right_column.append(chunk) | |
| # Sort the chunks within each column based on the y-coordinate | |
| left_column = sorted(left_column, key=lambda item: item["coordinates"][1]) | |
| right_column = sorted(right_column, key=lambda item: item["coordinates"][1]) | |
| sorted_text = left_column + right_column | |
| return sorted_text | |
| def majority_element(spans, param): | |
| char_count = defaultdict(int) | |
| for span in spans: | |
| span_text = span["text"] | |
| span_param = span[param] # Get the color or size for this span | |
| char_count[span_param] += len(span_text) # Count characters | |
| # Return the parameter value with the highest character count | |
| return max(char_count, key=char_count.get, default=None) | |
| def clean_text(text): | |
| """Cleans repeated text (OCR error)""" | |
| words = text.split() | |
| unique_words = OrderedDict.fromkeys(words) | |
| cleaned_text = " ".join(unique_words) | |
| cleaned_text = re.sub(r"\s+", " ", cleaned_text).strip() | |
| return cleaned_text | |
| def get_chunks(doc): | |
| allchunks = [] | |
| # Page Iteration | |
| for page_num in range(doc.page_count): | |
| chunks = [] | |
| page = doc[page_num] | |
| # Filter images (not needed) | |
| blocks = [i for i in page.get_text("dict")["blocks"] if "image" not in i] | |
| # Block Iteration | |
| for block in blocks: | |
| text = "" | |
| spans = [] | |
| # Line iteration | |
| for line in block["lines"]: | |
| for span in line["spans"]: | |
| # Only include text with a size greater than 9 | |
| if span["size"] > 9: | |
| span_text = span["text"] | |
| text += span_text + " " | |
| spans.append(span) # Store the span for majority calculation | |
| # Filter empty strings | |
| if text.strip(): | |
| chunks.append( | |
| { | |
| "text": clean_text(text.strip()), | |
| "page": page_num, | |
| "coordinates": [round(block["bbox"][0], 1), round(block["bbox"][1], 1)], | |
| "color": majority_element(spans, "color"), | |
| "size": majority_element(spans, "size"), | |
| } | |
| ) | |
| # Sort text according to column order | |
| allchunks.extend(sort_text(chunks)) | |
| return allchunks | |
| def process_activities(chunks): | |
| """Groups lines of 'Activity' together""" | |
| # activities = [] | |
| i = 0 | |
| while i < len(chunks): | |
| chunk = chunks[i] | |
| if "Activity" in chunk["text"]: | |
| activity = chunk.copy() | |
| activity_size = chunks[i + 1]["size"] if i + 1 < len(chunks) else None | |
| j = i + 1 | |
| while j < len(chunks) and chunks[j]["size"] == activity_size: | |
| activity["text"] += "\n" + chunks[j]["text"] | |
| j += 1 | |
| # Replace the range of chunks with the single activity chunk | |
| chunks[i:j] = [activity] | |
| # activities.append(activity) | |
| i += 1 | |
| else: | |
| i += 1 | |
| return chunks | |
| def index_pdf(path, buffer=False): | |
| if buffer: | |
| doc = pymupdf.open(stream=path, filetype="pdf") | |
| else: | |
| doc = pymupdf.open(path) | |
| chunks = get_chunks(doc) | |
| chunks = process_activities(chunks) | |
| print("--- pdf indexed") | |
| return chunks | |