import os import logging import re import shutil import tempfile import time from typing import Optional import zipfile import gdown from pypdf import PdfReader import docx as python_docx logger = logging.getLogger(__name__) def extract_text_from_file(file_path: str, file_type: str) -> Optional[str]: logger.info(f"[TEXT_EXTRACTION] Starting extraction from {file_type.upper()} file: {file_path}") text_content = None try: if file_type == 'pdf': reader = PdfReader(file_path) text_content = "".join(page.extract_text() + "\n" for page in reader.pages if page.extract_text()) logger.info(f"[TEXT_EXTRACTION] PDF extracted {len(reader.pages)} pages, {len(text_content)} characters") elif file_type == 'docx': doc = python_docx.Document(file_path) text_content = "\n".join(para.text for para in doc.paragraphs if para.text) logger.info(f"[TEXT_EXTRACTION] DOCX extracted {len(doc.paragraphs)} paragraphs, {len(text_content)} characters") elif file_type == 'txt': with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text_content = f.read() logger.info(f"[TEXT_EXTRACTION] TXT extracted {len(text_content)} characters") else: logger.warning(f"[TEXT_EXTRACTION] Unsupported file type: {file_type} for file {file_path}") return None if not text_content or not text_content.strip(): logger.warning(f"[TEXT_EXTRACTION] No text content extracted from {file_path}") return None logger.info(f"[TEXT_EXTRACTION] Successfully extracted text from {file_path}") return text_content.strip() except Exception as e: logger.error(f"[TEXT_EXTRACTION] Error extracting text from {file_path} ({file_type.upper()}): {e}", exc_info=True) return None FAISS_RAG_SUPPORTED_EXTENSIONS = { 'pdf': lambda path: extract_text_from_file(path, 'pdf'), 'docx': lambda path: extract_text_from_file(path, 'docx'), 'txt': lambda path: extract_text_from_file(path, 'txt'), } def get_id_from_gdrive_input(url_or_id: str) -> Optional[str]: if not url_or_id: return None match_folder = re.search(r"/folders/([a-zA-Z0-9_-]+)", url_or_id) if match_folder: return match_folder.group(1) match_file_d = re.search(r"/d/([a-zA-Z0-9_-]+)", url_or_id) if match_file_d: return match_file_d.group(1) match_uc = re.search(r"id=([a-zA-Z0-9_-]+)", url_or_id) if match_uc: return match_uc.group(1) if "/" not in url_or_id and "=" not in url_or_id and "." not in url_or_id and len(url_or_id) > 10: return url_or_id logger.warning(f"Could not reliably extract Google Drive ID from input: {url_or_id}") return None def download_gdrive_file(file_id_or_url: str, target_path: str) -> bool: """ Downloads a single file from Google Drive to a specific path. """ logger.info(f"[GDRIVE_SINGLE_FILE] Attempting to download file. Input: {file_id_or_url}") file_id = get_id_from_gdrive_input(file_id_or_url) if not file_id: logger.error(f"[GDRIVE_SINGLE_FILE] Invalid Google Drive File ID or URL provided: {file_id_or_url}") return False try: # Ensure the target directory exists before downloading target_dir = os.path.dirname(target_path) os.makedirs(target_dir, exist_ok=True) logger.info(f"[GDRIVE_SINGLE_FILE] Downloading file ID: {file_id} to path: {target_path}") # Use gdown to download directly to the target file path, fuzzy=True helps with some permissions gdown.download(id=file_id, output=target_path, quiet=False, fuzzy=True) if not os.path.exists(target_path) or os.path.getsize(target_path) == 0: logger.error("[GDRIVE_SINGLE_FILE] Download failed or the resulting file is empty.") return False logger.info(f"[GDRIVE_SINGLE_FILE] Download successful.") return True except Exception as e: logger.error(f"[GDRIVE_SINGLE_FILE] An error occurred during download: {e}", exc_info=True) return False def download_and_unzip_gdrive_file(file_id_or_url: str, target_extraction_dir: str) -> bool: """ Downloads a single ZIP file from Google Drive and extracts its contents. """ logger.info(f"[GDRIVE_FILE] Attempting to download and extract ZIP from Google Drive. Input: {file_id_or_url}") file_id = get_id_from_gdrive_input(file_id_or_url) if not file_id: logger.error(f"[GDRIVE_FILE] Invalid Google Drive File ID or URL provided: {file_id_or_url}") return False temp_download_dir = tempfile.mkdtemp(prefix="gdrive_zip_") temp_zip_path = os.path.join(temp_download_dir, "downloaded_file.zip") try: logger.info(f"[GDRIVE_FILE] Downloading file ID: {file_id} to temporary path: {temp_zip_path}") gdown.download(id=file_id, output=temp_zip_path, quiet=False) if not os.path.exists(temp_zip_path) or os.path.getsize(temp_zip_path) == 0: logger.error("[GDRIVE_FILE] Download failed or the resulting file is empty.") return False logger.info(f"[GDRIVE_FILE] Download successful. Extracting ZIP to: {target_extraction_dir}") os.makedirs(target_extraction_dir, exist_ok=True) with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: zip_ref.extractall(target_extraction_dir) logger.info(f"[GDRIVE_FILE] Successfully extracted ZIP archive.") return True except Exception as e: logger.error(f"[GDRIVE_FILE] An error occurred during download or extraction: {e}", exc_info=True) return False finally: if os.path.exists(temp_download_dir): try: shutil.rmtree(temp_download_dir) logger.debug(f"[GDRIVE_FILE] Cleaned up temporary directory: {temp_download_dir}") except Exception as e_del: logger.warning(f"[GDRIVE_FILE] Could not remove temporary directory '{temp_download_dir}': {e_del}") def download_and_unzip_gdrive_folder(folder_id_or_url: str, target_dir_for_contents: str) -> bool: logger.info(f"[GDRIVE] Attempting to download sources from Google Drive. Input: {folder_id_or_url}") folder_id = get_id_from_gdrive_input(folder_id_or_url) if not folder_id: logger.error(f"[GDRIVE] Invalid Google Drive Folder ID or URL provided: {folder_id_or_url}") return False temp_download_parent_dir = tempfile.mkdtemp(prefix="gdrive_parent_") download_path = None try: max_retries = 3 retry_delay_seconds = 10 last_gdown_exception = None for attempt in range(max_retries): logger.info(f"[GDRIVE] Attempt {attempt + 1} of {max_retries} to download folder ID: {folder_id}") try: start_time = time.time() download_path = gdown.download_folder(id=folder_id, output=temp_download_parent_dir, quiet=False, use_cookies=False) download_time = time.time() - start_time if download_path and os.path.exists(temp_download_parent_dir) and os.listdir(temp_download_parent_dir): logger.info(f"[GDRIVE] Successfully downloaded in {download_time:.2f}s. Path: {download_path}") last_gdown_exception = None break else: logger.warning(f"[GDRIVE] Attempt {attempt + 1} completed but directory is empty") if attempt < max_retries - 1: logger.info(f"[GDRIVE] Retrying in {retry_delay_seconds} seconds...") time.sleep(retry_delay_seconds) if os.path.exists(temp_download_parent_dir): shutil.rmtree(temp_download_parent_dir) os.makedirs(temp_download_parent_dir) else: raise Exception("gdown failed to populate the directory after multiple attempts.") except Exception as e: last_gdown_exception = e logger.warning(f"[GDRIVE] Attempt {attempt + 1} failed: {e}") if attempt < max_retries - 1: logger.info(f"[GDRIVE] Retrying in {retry_delay_seconds} seconds...") time.sleep(retry_delay_seconds) if os.path.exists(temp_download_parent_dir): shutil.rmtree(temp_download_parent_dir) os.makedirs(temp_download_parent_dir) else: logger.error(f"[GDRIVE] Failed after {max_retries} attempts. Last error: {e}", exc_info=True) return False if last_gdown_exception: logger.error(f"[GDRIVE] Failed after all retries. Last error: {last_gdown_exception}", exc_info=True) return False os.makedirs(target_dir_for_contents, exist_ok=True) items_in_temp_parent = os.listdir(temp_download_parent_dir) source_content_root = temp_download_parent_dir if len(items_in_temp_parent) == 1 and os.path.isdir(os.path.join(temp_download_parent_dir, items_in_temp_parent[0])): potential_actual_root = os.path.join(temp_download_parent_dir, items_in_temp_parent[0]) if download_path and os.path.isdir(download_path) and os.path.normpath(download_path) == os.path.normpath(potential_actual_root): logger.info(f"[GDRIVE] Using nested directory: {items_in_temp_parent[0]}") source_content_root = potential_actual_root elif not download_path or not os.path.isdir(download_path): logger.info(f"[GDRIVE] Using nested directory (heuristic): {items_in_temp_parent[0]}") source_content_root = potential_actual_root logger.info(f"[GDRIVE] Moving contents from {source_content_root} to {target_dir_for_contents}") files_moved = 0 for item_name in os.listdir(source_content_root): s_item = os.path.join(source_content_root, item_name) d_item = os.path.join(target_dir_for_contents, item_name) if os.path.exists(d_item): if os.path.isdir(d_item): shutil.rmtree(d_item) else: os.remove(d_item) if os.path.isdir(s_item): shutil.move(s_item, d_item) else: shutil.move(s_item, d_item) files_moved += 1 logger.info(f"[GDRIVE] Successfully moved {files_moved} items to {target_dir_for_contents}") return True except Exception as e: logger.error(f"[GDRIVE] Unexpected error during download/processing: {e}", exc_info=True) return False finally: if os.path.exists(temp_download_parent_dir): try: shutil.rmtree(temp_download_parent_dir) logger.debug(f"[GDRIVE] Cleaned up temporary directory") except Exception as e_del: logger.warning(f"[GDRIVE] Could not remove temporary directory: {e_del}")