Spaces:
Runtime error
Runtime error
import requests | |
from bs4 import BeautifulSoup | |
from tqdm import tqdm | |
import chainlit as cl | |
from langchain import PromptTemplate | |
import requests | |
from bs4 import BeautifulSoup | |
from urllib.parse import urlparse, urljoin, urldefrag | |
import asyncio | |
import aiohttp | |
from aiohttp import ClientSession | |
try: | |
from modules.constants import * | |
except: | |
from constants import * | |
""" | |
Ref: https://python.plainenglish.io/scraping-the-subpages-on-a-website-ea2d4e3db113 | |
""" | |
class WebpageCrawler: | |
def __init__(self): | |
self.dict_href_links = {} | |
async def fetch(self, session: ClientSession, url: str) -> str: | |
async with session.get(url) as response: | |
try: | |
return await response.text() | |
except UnicodeDecodeError: | |
return await response.text(encoding="latin1") | |
def url_exists(self, url: str) -> bool: | |
try: | |
response = requests.head(url) | |
return response.status_code == 200 | |
except requests.ConnectionError: | |
return False | |
async def get_links(self, session: ClientSession, website_link: str, base_url: str): | |
html_data = await self.fetch(session, website_link) | |
soup = BeautifulSoup(html_data, "html.parser") | |
list_links = [] | |
for link in soup.find_all("a", href=True): | |
href = link["href"].strip() | |
full_url = urljoin(base_url, href) | |
normalized_url = self.normalize_url(full_url) # sections removed | |
if ( | |
normalized_url not in self.dict_href_links | |
and self.is_child_url(normalized_url, base_url) | |
and self.url_exists(normalized_url) | |
): | |
self.dict_href_links[normalized_url] = None | |
list_links.append(normalized_url) | |
return list_links | |
async def get_subpage_links( | |
self, session: ClientSession, urls: list, base_url: str | |
): | |
tasks = [self.get_links(session, url, base_url) for url in urls] | |
results = await asyncio.gather(*tasks) | |
all_links = [link for sublist in results for link in sublist] | |
return all_links | |
async def get_all_pages(self, url: str, base_url: str): | |
async with aiohttp.ClientSession() as session: | |
dict_links = {url: "Not-checked"} | |
counter = None | |
while counter != 0: | |
unchecked_links = [ | |
link | |
for link, status in dict_links.items() | |
if status == "Not-checked" | |
] | |
if not unchecked_links: | |
break | |
new_links = await self.get_subpage_links( | |
session, unchecked_links, base_url | |
) | |
for link in unchecked_links: | |
dict_links[link] = "Checked" | |
print(f"Checked: {link}") | |
dict_links.update( | |
{ | |
link: "Not-checked" | |
for link in new_links | |
if link not in dict_links | |
} | |
) | |
counter = len( | |
[ | |
status | |
for status in dict_links.values() | |
if status == "Not-checked" | |
] | |
) | |
checked_urls = [ | |
url for url, status in dict_links.items() if status == "Checked" | |
] | |
return checked_urls | |
def is_webpage(self, url: str) -> bool: | |
try: | |
response = requests.head(url, allow_redirects=True) | |
content_type = response.headers.get("Content-Type", "").lower() | |
return "text/html" in content_type | |
except requests.RequestException: | |
return False | |
def clean_url_list(self, urls): | |
files, webpages = [], [] | |
for url in urls: | |
if self.is_webpage(url): | |
webpages.append(url) | |
else: | |
files.append(url) | |
return files, webpages | |
def is_child_url(self, url, base_url): | |
return url.startswith(base_url) | |
def normalize_url(self, url: str): | |
# Strip the fragment identifier | |
defragged_url, _ = urldefrag(url) | |
return defragged_url | |
def get_urls_from_file(file_path: str): | |
""" | |
Function to get urls from a file | |
""" | |
with open(file_path, "r") as f: | |
urls = f.readlines() | |
urls = [url.strip() for url in urls] | |
return urls | |
def get_base_url(url): | |
parsed_url = urlparse(url) | |
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/" | |
return base_url | |
def get_prompt(config): | |
if config["llm_params"]["use_history"]: | |
if config["llm_params"]["llm_loader"] == "local_llm": | |
custom_prompt_template = tinyllama_prompt_template_with_history | |
elif config["llm_params"]["llm_loader"] == "openai": | |
custom_prompt_template = openai_prompt_template_with_history | |
# else: | |
# custom_prompt_template = tinyllama_prompt_template_with_history # default | |
prompt = PromptTemplate( | |
template=custom_prompt_template, | |
input_variables=["context", "chat_history", "question"], | |
) | |
else: | |
if config["llm_params"]["llm_loader"] == "local_llm": | |
custom_prompt_template = tinyllama_prompt_template | |
elif config["llm_params"]["llm_loader"] == "openai": | |
custom_prompt_template = openai_prompt_template | |
# else: | |
# custom_prompt_template = tinyllama_prompt_template | |
prompt = PromptTemplate( | |
template=custom_prompt_template, | |
input_variables=["context", "question"], | |
) | |
return prompt | |
def get_sources(res, answer): | |
source_elements = [] | |
source_dict = {} # Dictionary to store URL elements | |
for idx, source in enumerate(res["source_documents"]): | |
source_metadata = source.metadata | |
url = source_metadata["source"] | |
score = source_metadata.get("score", "N/A") | |
page = source_metadata.get("page", 1) | |
date = source_metadata.get("date", "N/A") | |
url_name = f"{url}_{page}" | |
if url_name not in source_dict: | |
source_dict[url_name] = { | |
"text": source.page_content, | |
"url": url, | |
"score": score, | |
"page": page, | |
"date": date, | |
} | |
else: | |
source_dict[url_name]["text"] += f"\n\n{source.page_content}" | |
# First, display the answer | |
full_answer = "**Answer:**\n" | |
full_answer += answer | |
# Then, display the sources | |
full_answer += "\n\n**Sources:**\n" | |
for idx, (url_name, source_data) in enumerate(source_dict.items()): | |
full_answer += f"\nSource {idx + 1} (Score: {source_data['score']}): {source_data['url']}\n" | |
name = f"Source {idx + 1} Text\n" | |
full_answer += name | |
source_elements.append( | |
cl.Text(name=name, content=source_data["text"], display="side") | |
) | |
# Add a PDF element if the source is a PDF file | |
if source_data["url"].lower().endswith(".pdf"): | |
name = f"Source {idx + 1} PDF\n" | |
full_answer += name | |
pdf_url = f"{source_data['url']}#page={source_data['page']+1}" | |
source_elements.append(cl.Pdf(name=name, url=pdf_url, display="side")) | |
full_answer += "\n**Metadata:**\n" | |
for idx, (url_name, source_data) in enumerate(source_dict.items()): | |
full_answer += f"Source {idx+1} Metadata\n" | |
source_elements.append( | |
cl.Text( | |
name=f"Source {idx+1} Metadata", | |
content=f"Page: {source_data['page']}\nDate: {source_data['date']}\n", | |
display="side", | |
) | |
) | |
return full_answer, source_elements | |
def get_metadata(file_names): | |
""" | |
Function to get any additional metadata from the files | |
Returns a dict with the file_name: {metadata: value} | |
""" | |
metadata_dict = {} | |
for file in file_names: | |
metadata_dict[file] = { | |
"source_type": "N/A", | |
} | |
return metadata_dict | |