Spaces:
Running
Running
import re # Point 1: Pre-Compile Regular Expressions | |
import time | |
from abc import ABC, abstractmethod | |
from typing import Dict, Any, Optional | |
from bs4 import BeautifulSoup | |
from concurrent.futures import ThreadPoolExecutor | |
import asyncio, requests, re, os | |
from .config import * | |
from bs4 import element, NavigableString, Comment | |
from bs4 import PageElement, Tag | |
from urllib.parse import urljoin | |
from requests.exceptions import InvalidSchema | |
# from .content_cleaning_strategy import ContentCleaningStrategy | |
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter | |
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator | |
from .models import MarkdownGenerationResult | |
from .utils import ( | |
extract_metadata, | |
normalize_url, | |
is_external_url, | |
get_base_domain, | |
) | |
# Pre-compile regular expressions for Open Graph and Twitter metadata | |
OG_REGEX = re.compile(r'^og:') | |
TWITTER_REGEX = re.compile(r'^twitter:') | |
DIMENSION_REGEX = re.compile(r"(\d+)(\D*)") | |
# Function to parse image height/width value and units | |
def parse_dimension(dimension): | |
if dimension: | |
# match = re.match(r"(\d+)(\D*)", dimension) | |
match = DIMENSION_REGEX.match(dimension) | |
if match: | |
number = int(match.group(1)) | |
unit = match.group(2) or 'px' # Default unit is 'px' if not specified | |
return number, unit | |
return None, None | |
# Fetch image file metadata to extract size and extension | |
def fetch_image_file_size(img, base_url): | |
#If src is relative path construct full URL, if not it may be CDN URL | |
img_url = urljoin(base_url,img.get('src')) | |
try: | |
response = requests.head(img_url) | |
if response.status_code == 200: | |
return response.headers.get('Content-Length',None) | |
else: | |
print(f"Failed to retrieve file size for {img_url}") | |
return None | |
except InvalidSchema as e: | |
return None | |
finally: | |
return | |
class ContentScrapingStrategy(ABC): | |
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: | |
pass | |
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: | |
pass | |
class WebScrapingStrategy(ContentScrapingStrategy): | |
""" | |
Class for web content scraping. Perhaps the most important class. | |
How it works: | |
1. Extract content from HTML using BeautifulSoup. | |
2. Clean the extracted content using a content cleaning strategy. | |
3. Filter the cleaned content using a content filtering strategy. | |
4. Generate markdown content from the filtered content. | |
5. Return the markdown content. | |
""" | |
def __init__(self, logger=None): | |
self.logger = logger | |
def _log(self, level, message, tag="SCRAPE", **kwargs): | |
"""Helper method to safely use logger.""" | |
if self.logger: | |
log_method = getattr(self.logger, level) | |
log_method(message=message, tag=tag, **kwargs) | |
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: | |
""" | |
Main entry point for content scraping. | |
Args: | |
url (str): The URL of the page to scrape. | |
html (str): The HTML content of the page. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
Dict[str, Any]: A dictionary containing the scraped content. This dictionary contains the following keys: | |
- 'markdown': The generated markdown content, type is str, however soon will become MarkdownGenerationResult via 'markdown.raw_markdown'. | |
- 'fit_markdown': The generated markdown content with relevant content filtered, this will be removed soon and available in 'markdown.fit_markdown'. | |
- 'fit_html': The HTML content with relevant content filtered, this will be removed soon and available in 'markdown.fit_html'. | |
- 'markdown_v2': The generated markdown content with relevant content filtered, this is temporary and will be removed soon and replaced with 'markdown' | |
""" | |
return self._scrap(url, html, is_async=False, **kwargs) | |
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: | |
""" | |
Main entry point for asynchronous content scraping. | |
Args: | |
url (str): The URL of the page to scrape. | |
html (str): The HTML content of the page. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
Dict[str, Any]: A dictionary containing the scraped content. This dictionary contains the following keys: | |
- 'markdown': The generated markdown content, type is str, however soon will become MarkdownGenerationResult via 'markdown.raw_markdown'. | |
- 'fit_markdown': The generated markdown content with relevant content filtered, this will be removed soon and available in 'markdown.fit_markdown'. | |
- 'fit_html': The HTML content with relevant content filtered, this will be removed soon and available in 'markdown.fit_html'. | |
- 'markdown_v2': The generated markdown content with relevant content filtered, this is temporary and will be removed soon and replaced with 'markdown' | |
""" | |
return await asyncio.to_thread(self._scrap, url, html, **kwargs) | |
def flatten_nested_elements(self, node): | |
""" | |
Flatten nested elements in a HTML tree. | |
Args: | |
node (Tag): The root node of the HTML tree. | |
Returns: | |
Tag: The flattened HTML tree. | |
""" | |
if isinstance(node, NavigableString): | |
return node | |
if len(node.contents) == 1 and isinstance(node.contents[0], Tag) and node.contents[0].name == node.name: | |
return self.flatten_nested_elements(node.contents[0]) | |
node.contents = [self.flatten_nested_elements(child) for child in node.contents] | |
return node | |
def find_closest_parent_with_useful_text(self, tag, **kwargs): | |
""" | |
Find the closest parent with useful text. | |
Args: | |
tag (Tag): The starting tag to search from. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
Tag: The closest parent with useful text, or None if not found. | |
""" | |
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) | |
current_tag = tag | |
while current_tag: | |
current_tag = current_tag.parent | |
# Get the text content of the parent tag | |
if current_tag: | |
text_content = current_tag.get_text(separator=' ',strip=True) | |
# Check if the text content has at least word_count_threshold | |
if len(text_content.split()) >= image_description_min_word_threshold: | |
return text_content | |
return None | |
def remove_unwanted_attributes(self, element, important_attrs, keep_data_attributes=False): | |
""" | |
Remove unwanted attributes from an HTML element. | |
Args: | |
element (Tag): The HTML element to remove attributes from. | |
important_attrs (list): List of important attributes to keep. | |
keep_data_attributes (bool): Whether to keep data attributes. | |
Returns: | |
None | |
""" | |
attrs_to_remove = [] | |
for attr in element.attrs: | |
if attr not in important_attrs: | |
if keep_data_attributes: | |
if not attr.startswith('data-'): | |
attrs_to_remove.append(attr) | |
else: | |
attrs_to_remove.append(attr) | |
for attr in attrs_to_remove: | |
del element[attr] | |
def process_image(self, img, url, index, total_images, **kwargs): | |
""" | |
Process an image element. | |
How it works: | |
1. Check if the image has valid display and inside undesired html elements. | |
2. Score an image for it's usefulness. | |
3. Extract image file metadata to extract size and extension. | |
4. Generate a dictionary with the processed image information. | |
5. Return the processed image information. | |
Args: | |
img (Tag): The image element to process. | |
url (str): The URL of the page containing the image. | |
index (int): The index of the image in the list of images. | |
total_images (int): The total number of images in the list. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
dict: A dictionary containing the processed image information. | |
""" | |
parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w') | |
if ' ' in u else None} | |
for u in [f"http{p}" for p in s.split("http") if p]] | |
# Constants for checks | |
classes_to_check = frozenset(['button', 'icon', 'logo']) | |
tags_to_check = frozenset(['button', 'input']) | |
image_formats = frozenset(['jpg', 'jpeg', 'png', 'webp', 'avif', 'gif']) | |
# Pre-fetch commonly used attributes | |
style = img.get('style', '') | |
alt = img.get('alt', '') | |
src = img.get('src', '') | |
data_src = img.get('data-src', '') | |
srcset = img.get('srcset', '') | |
data_srcset = img.get('data-srcset', '') | |
width = img.get('width') | |
height = img.get('height') | |
parent = img.parent | |
parent_classes = parent.get('class', []) | |
# Quick validation checks | |
if ('display:none' in style or | |
parent.name in tags_to_check or | |
any(c in cls for c in parent_classes for cls in classes_to_check) or | |
any(c in src for c in classes_to_check) or | |
any(c in alt for c in classes_to_check)): | |
return None | |
# Quick score calculation | |
score = 0 | |
if width and width.isdigit(): | |
width_val = int(width) | |
score += 1 if width_val > 150 else 0 | |
if height and height.isdigit(): | |
height_val = int(height) | |
score += 1 if height_val > 150 else 0 | |
if alt: | |
score += 1 | |
score += index/total_images < 0.5 | |
# image_format = '' | |
# if "data:image/" in src: | |
# image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0] | |
# else: | |
# image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0] | |
# if image_format in ('jpg', 'png', 'webp', 'avif'): | |
# score += 1 | |
# Check for image format in all possible sources | |
def has_image_format(url): | |
return any(fmt in url.lower() for fmt in image_formats) | |
# Score for having proper image sources | |
if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]): | |
score += 1 | |
if srcset or data_srcset: | |
score += 1 | |
if img.find_parent('picture'): | |
score += 1 | |
# Detect format from any available source | |
detected_format = None | |
for url in [src, data_src, srcset, data_srcset]: | |
if url: | |
format_matches = [fmt for fmt in image_formats if fmt in url.lower()] | |
if format_matches: | |
detected_format = format_matches[0] | |
break | |
if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD): | |
return None | |
# Use set for deduplication | |
unique_urls = set() | |
image_variants = [] | |
# Generate a unique group ID for this set of variants | |
group_id = index | |
# Base image info template | |
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) | |
base_info = { | |
'alt': alt, | |
'desc': self.find_closest_parent_with_useful_text(img, **kwargs), | |
'score': score, | |
'type': 'image', | |
'group_id': group_id, # Group ID for this set of variants | |
'format': detected_format, | |
} | |
# Inline function for adding variants | |
def add_variant(src, width=None): | |
if src and not src.startswith('data:') and src not in unique_urls: | |
unique_urls.add(src) | |
image_variants.append({**base_info, 'src': src, 'width': width}) | |
# Process all sources | |
add_variant(src) | |
add_variant(data_src) | |
# Handle srcset and data-srcset in one pass | |
for attr in ('srcset', 'data-srcset'): | |
if value := img.get(attr): | |
for source in parse_srcset(value): | |
add_variant(source['url'], source['width']) | |
# Quick picture element check | |
if picture := img.find_parent('picture'): | |
for source in picture.find_all('source'): | |
if srcset := source.get('srcset'): | |
for src in parse_srcset(srcset): | |
add_variant(src['url'], src['width']) | |
# Framework-specific attributes in one pass | |
for attr, value in img.attrs.items(): | |
if attr.startswith('data-') and ('src' in attr or 'srcset' in attr) and 'http' in value: | |
add_variant(value) | |
return image_variants if image_variants else None | |
def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]: | |
""" | |
Process an HTML element. | |
How it works: | |
1. Check if the element is an image, video, or audio. | |
2. Extract the element's attributes and content. | |
3. Process the element based on its type. | |
4. Return the processed element information. | |
Args: | |
url (str): The URL of the page containing the element. | |
element (Tag): The HTML element to process. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
dict: A dictionary containing the processed element information. | |
""" | |
media = {'images': [], 'videos': [], 'audios': []} | |
internal_links_dict = {} | |
external_links_dict = {} | |
self._process_element( | |
url, | |
element, | |
media, | |
internal_links_dict, | |
external_links_dict, | |
**kwargs | |
) | |
return { | |
'media': media, | |
'internal_links_dict': internal_links_dict, | |
'external_links_dict': external_links_dict | |
} | |
def _process_element(self, url, element: PageElement, media: Dict[str, Any], internal_links_dict: Dict[str, Any], external_links_dict: Dict[str, Any], **kwargs) -> bool: | |
""" | |
Process an HTML element. | |
""" | |
try: | |
if isinstance(element, NavigableString): | |
if isinstance(element, Comment): | |
element.extract() | |
return False | |
# if element.name == 'img': | |
# process_image(element, url, 0, 1) | |
# return True | |
base_domain = kwargs.get("base_domain", get_base_domain(url)) | |
if element.name in ['script', 'style', 'link', 'meta', 'noscript']: | |
element.decompose() | |
return False | |
keep_element = False | |
exclude_domains = kwargs.get('exclude_domains', []) | |
# exclude_social_media_domains = kwargs.get('exclude_social_media_domains', set(SOCIAL_MEDIA_DOMAINS)) | |
# exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', []) | |
# exclude_social_media_domains = list(set(exclude_social_media_domains)) | |
try: | |
if element.name == 'a' and element.get('href'): | |
href = element.get('href', '').strip() | |
if not href: # Skip empty hrefs | |
return False | |
url_base = url.split('/')[2] | |
# Normalize the URL | |
try: | |
normalized_href = normalize_url(href, url) | |
except ValueError as e: | |
# logging.warning(f"Invalid URL format: {href}, Error: {str(e)}") | |
return False | |
link_data = { | |
'href': normalized_href, | |
'text': element.get_text().strip(), | |
'title': element.get('title', '').strip(), | |
'base_domain': base_domain | |
} | |
is_external = is_external_url(normalized_href, base_domain) | |
keep_element = True | |
# Handle external link exclusions | |
if is_external: | |
link_base_domain = get_base_domain(normalized_href) | |
link_data['base_domain'] = link_base_domain | |
if kwargs.get('exclude_external_links', False): | |
element.decompose() | |
return False | |
# elif kwargs.get('exclude_social_media_links', False): | |
# if link_base_domain in exclude_social_media_domains: | |
# element.decompose() | |
# return False | |
# if any(domain in normalized_href.lower() for domain in exclude_social_media_domains): | |
# element.decompose() | |
# return False | |
elif exclude_domains: | |
if link_base_domain in exclude_domains: | |
element.decompose() | |
return False | |
# if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])): | |
# element.decompose() | |
# return False | |
if is_external: | |
if normalized_href not in external_links_dict: | |
external_links_dict[normalized_href] = link_data | |
else: | |
if normalized_href not in internal_links_dict: | |
internal_links_dict[normalized_href] = link_data | |
except Exception as e: | |
raise Exception(f"Error processing links: {str(e)}") | |
try: | |
if element.name == 'img': | |
potential_sources = ['src', 'data-src', 'srcset' 'data-lazy-src', 'data-original'] | |
src = element.get('src', '') | |
while not src and potential_sources: | |
src = element.get(potential_sources.pop(0), '') | |
if not src: | |
element.decompose() | |
return False | |
# If it is srcset pick up the first image | |
if 'srcset' in element.attrs: | |
src = element.attrs['srcset'].split(',')[0].split(' ')[0] | |
# If image src is internal, then skip | |
if not is_external_url(src, base_domain): | |
return True | |
image_src_base_domain = get_base_domain(src) | |
# Check flag if we should remove external images | |
if kwargs.get('exclude_external_images', False): | |
element.decompose() | |
return False | |
# src_url_base = src.split('/')[2] | |
# url_base = url.split('/')[2] | |
# if url_base not in src_url_base: | |
# element.decompose() | |
# return False | |
# if kwargs.get('exclude_social_media_links', False): | |
# if image_src_base_domain in exclude_social_media_domains: | |
# element.decompose() | |
# return False | |
# src_url_base = src.split('/')[2] | |
# url_base = url.split('/')[2] | |
# if any(domain in src for domain in exclude_social_media_domains): | |
# element.decompose() | |
# return False | |
# Handle exclude domains | |
if exclude_domains: | |
if image_src_base_domain in exclude_domains: | |
element.decompose() | |
return False | |
# if any(domain in src for domain in kwargs.get('exclude_domains', [])): | |
# element.decompose() | |
# return False | |
return True # Always keep image elements | |
except Exception as e: | |
raise "Error processing images" | |
# Check if flag to remove all forms is set | |
if kwargs.get('remove_forms', False) and element.name == 'form': | |
element.decompose() | |
return False | |
if element.name in ['video', 'audio']: | |
media[f"{element.name}s"].append({ | |
'src': element.get('src'), | |
'alt': element.get('alt'), | |
'type': element.name, | |
'description': self.find_closest_parent_with_useful_text(element, **kwargs) | |
}) | |
source_tags = element.find_all('source') | |
for source_tag in source_tags: | |
media[f"{element.name}s"].append({ | |
'src': source_tag.get('src'), | |
'alt': element.get('alt'), | |
'type': element.name, | |
'description': self.find_closest_parent_with_useful_text(element, **kwargs) | |
}) | |
return True # Always keep video and audio elements | |
if element.name in ONLY_TEXT_ELIGIBLE_TAGS: | |
if kwargs.get('only_text', False): | |
element.replace_with(element.get_text()) | |
try: | |
self.remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False)) | |
except Exception as e: | |
# print('Error removing unwanted attributes:', str(e)) | |
self._log('error', | |
message="Error removing unwanted attributes: {error}", | |
tag="SCRAPE", | |
params={"error": str(e)} | |
) | |
# Process children | |
for child in list(element.children): | |
if isinstance(child, NavigableString) and not isinstance(child, Comment): | |
if len(child.strip()) > 0: | |
keep_element = True | |
else: | |
if self._process_element(url, child, media, internal_links_dict, external_links_dict, **kwargs): | |
keep_element = True | |
# Check word count | |
word_count_threshold = kwargs.get('word_count_threshold', MIN_WORD_THRESHOLD) | |
if not keep_element: | |
word_count = len(element.get_text(strip=True).split()) | |
keep_element = word_count >= word_count_threshold | |
if not keep_element: | |
element.decompose() | |
return keep_element | |
except Exception as e: | |
# print('Error processing element:', str(e)) | |
self._log('error', | |
message="Error processing element: {error}", | |
tag="SCRAPE", | |
params={"error": str(e)} | |
) | |
return False | |
def _scrap(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: | |
""" | |
Extract content from HTML using BeautifulSoup. | |
Args: | |
url (str): The URL of the page to scrape. | |
html (str): The HTML content of the page to scrape. | |
word_count_threshold (int): The minimum word count threshold for content extraction. | |
css_selector (str): The CSS selector to use for content extraction. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
dict: A dictionary containing the extracted content. | |
""" | |
success = True | |
if not html: | |
return None | |
parser_type = kwargs.get('parser', 'lxml') | |
soup = BeautifulSoup(html, parser_type) | |
body = soup.body | |
base_domain = get_base_domain(url) | |
try: | |
meta = extract_metadata("", soup) | |
except Exception as e: | |
self._log('error', | |
message="Error extracting metadata: {error}", | |
tag="SCRAPE", | |
params={"error": str(e)} | |
) | |
meta = {} | |
# Handle tag-based removal first - faster than CSS selection | |
excluded_tags = set(kwargs.get('excluded_tags', []) or []) | |
if excluded_tags: | |
for element in body.find_all(lambda tag: tag.name in excluded_tags): | |
element.extract() | |
# Handle CSS selector-based removal | |
excluded_selector = kwargs.get('excluded_selector', '') | |
if excluded_selector: | |
is_single_selector = ',' not in excluded_selector and ' ' not in excluded_selector | |
if is_single_selector: | |
while element := body.select_one(excluded_selector): | |
element.extract() | |
else: | |
for element in body.select(excluded_selector): | |
element.extract() | |
if css_selector: | |
selected_elements = body.select(css_selector) | |
if not selected_elements: | |
return { | |
'markdown': '', | |
'cleaned_html': '', | |
'success': True, | |
'media': {'images': [], 'videos': [], 'audios': []}, | |
'links': {'internal': [], 'external': []}, | |
'metadata': {}, | |
'message': f"No elements found for CSS selector: {css_selector}" | |
} | |
# raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}") | |
body = soup.new_tag('div') | |
for el in selected_elements: | |
body.append(el) | |
kwargs['exclude_social_media_domains'] = set(kwargs.get('exclude_social_media_domains', []) + SOCIAL_MEDIA_DOMAINS) | |
kwargs['exclude_domains'] = set(kwargs.get('exclude_domains', [])) | |
if kwargs.get('exclude_social_media_links', False): | |
kwargs['exclude_domains'] = kwargs['exclude_domains'].union(kwargs['exclude_social_media_domains']) | |
result_obj = self.process_element( | |
url, | |
body, | |
word_count_threshold = word_count_threshold, | |
base_domain=base_domain, | |
**kwargs | |
) | |
links = {'internal': [], 'external': []} | |
media = result_obj['media'] | |
internal_links_dict = result_obj['internal_links_dict'] | |
external_links_dict = result_obj['external_links_dict'] | |
# Update the links dictionary with unique links | |
links['internal'] = list(internal_links_dict.values()) | |
links['external'] = list(external_links_dict.values()) | |
# # Process images using ThreadPoolExecutor | |
imgs = body.find_all('img') | |
media['images'] = [ | |
img for result in (self.process_image(img, url, i, len(imgs)) | |
for i, img in enumerate(imgs)) | |
if result is not None | |
for img in result | |
] | |
body = self.flatten_nested_elements(body) | |
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') | |
for img in imgs: | |
src = img.get('src', '') | |
if base64_pattern.match(src): | |
# Replace base64 data with empty string | |
img['src'] = base64_pattern.sub('', src) | |
str_body = "" | |
try: | |
str_body = body.encode_contents().decode('utf-8') | |
except Exception as e: | |
# Reset body to the original HTML | |
success = False | |
body = BeautifulSoup(html, 'html.parser') | |
# Create a new div with a special ID | |
error_div = body.new_tag('div', id='crawl4ai_error_message') | |
error_div.string = ''' | |
Crawl4AI Error: This page is not fully supported. | |
Possible reasons: | |
1. The page may have restrictions that prevent crawling. | |
2. The page might not be fully loaded. | |
Suggestions: | |
- Try calling the crawl function with these parameters: | |
magic=True, | |
- Set headless=False to visualize what's happening on the page. | |
If the issue persists, please check the page's structure and any potential anti-crawling measures. | |
''' | |
# Append the error div to the body | |
body.body.append(error_div) | |
str_body = body.encode_contents().decode('utf-8') | |
print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.") | |
self._log('error', | |
message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.", | |
tag="SCRAPE" | |
) | |
cleaned_html = str_body.replace('\n\n', '\n').replace(' ', ' ') | |
return { | |
# **markdown_content, | |
'cleaned_html': cleaned_html, | |
'success': success, | |
'media': media, | |
'links': links, | |
'metadata': meta | |
} | |