random2345t6 / chunker.py
SakibAhmed's picture
Upload 8 files
eac6673 verified
raw
history blame
5.83 kB
import os
import logging
import json
import argparse
from typing import List, Dict, Optional
from langchain.text_splitter import RecursiveCharacterTextSplitter
# MODIFIED: Import the text extraction utility to avoid code duplication
from utils import extract_text_from_file, FAISS_RAG_SUPPORTED_EXTENSIONS
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Note: The 'extract_text_from_file' and 'SUPPORTED_EXTENSIONS' dictionary
# have been removed from this file and are now imported from 'utils.py'
# to ensure a single source of truth for file processing logic.
def process_sources_and_create_chunks(
sources_dir: str,
output_file: str,
chunk_size: int = 1000,
chunk_overlap: int = 150,
text_output_dir: Optional[str] = None
) -> None:
"""
Scans a directory for source files, extracts text, splits it into chunks,
and saves the chunks to a single JSON file.
Optionally saves the raw extracted text to a specified directory.
"""
if not os.path.isdir(sources_dir):
logger.error(f"Source directory not found: '{sources_dir}'")
raise FileNotFoundError(f"Source directory not found: '{sources_dir}'")
logger.info(f"Starting chunking process. Sources: '{sources_dir}', Output: '{output_file}'")
if text_output_dir:
os.makedirs(text_output_dir, exist_ok=True)
logger.info(f"Will save raw extracted text to: '{text_output_dir}'")
all_chunks_for_json: List[Dict] = []
processed_files_count = 0
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
for filename in os.listdir(sources_dir):
file_path = os.path.join(sources_dir, filename)
if not os.path.isfile(file_path):
continue
file_ext = filename.split('.')[-1].lower()
if file_ext not in FAISS_RAG_SUPPORTED_EXTENSIONS:
logger.debug(f"Skipping unsupported file: {filename}")
continue
logger.info(f"Processing source file: {filename}")
# MODIFIED: Use the imported function
text_content = FAISS_RAG_SUPPORTED_EXTENSIONS[file_ext](file_path)
if text_content:
if text_output_dir:
try:
text_output_path = os.path.join(text_output_dir, f"{filename}.txt")
with open(text_output_path, 'w', encoding='utf-8') as f_text:
f_text.write(text_content)
logger.info(f"Saved extracted text for '{filename}' to '{text_output_path}'")
except Exception as e_text_save:
logger.error(f"Could not save extracted text for '{filename}': {e_text_save}")
chunks = text_splitter.split_text(text_content)
if not chunks:
logger.warning(f"No chunks generated from {filename}. Skipping.")
continue
for i, chunk_text in enumerate(chunks):
chunk_data = {
"page_content": chunk_text,
"metadata": {
"source_document_name": filename,
"chunk_index": i,
"full_location": f"{filename}, Chunk {i+1}"
}
}
all_chunks_for_json.append(chunk_data)
processed_files_count += 1
else:
logger.warning(f"Could not extract text from {filename}. Skipping.")
if not all_chunks_for_json:
logger.warning(f"No processable documents found or no text extracted in '{sources_dir}'. JSON file will be empty.")
output_dir = os.path.dirname(output_file)
os.makedirs(output_dir, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(all_chunks_for_json, f, indent=2)
logger.info(f"Chunking complete. Processed {processed_files_count} files.")
logger.info(f"Created a total of {len(all_chunks_for_json)} chunks.")
logger.info(f"Chunked JSON output saved to: {output_file}")
def main():
parser = argparse.ArgumentParser(description="Process source documents into a JSON file of text chunks for RAG.")
parser.add_argument(
'--sources-dir',
type=str,
required=True,
help="The directory containing source files (PDFs, DOCX, TXT)."
)
parser.add_argument(
'--output-file',
type=str,
required=True,
help="The full path for the output JSON file containing the chunks."
)
parser.add_argument(
'--text-output-dir',
type=str,
default=None,
help="Optional: The directory to save raw extracted text files for debugging."
)
parser.add_argument(
'--chunk-size',
type=int,
default=1000,
help="The character size for each text chunk."
)
parser.add_argument(
'--chunk-overlap',
type=int,
default=150,
help="The character overlap between consecutive chunks."
)
args = parser.parse_args()
try:
process_sources_and_create_chunks(
sources_dir=args.sources_dir,
output_file=args.output_file,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
text_output_dir=args.text_output_dir
)
except Exception as e:
logger.critical(f"A critical error occurred during the chunking process: {e}", exc_info=True)
exit(1)
if __name__ == "__main__":
main()