#!/usr/bin/env python3 # /// script # requires-python = ">=3.12" # dependencies = [ # "en-core-web-sm", # "spacy", # ] # # [tool.uv.sources] # en-core-web-sm = { url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl" } # /// #!/usr/bin/env python3 import argparse import csv import re import sys from pathlib import Path import spacy def main(): parser = argparse.ArgumentParser( description="Tokenize text files and output chunked CSV" ) parser.add_argument("files", nargs="+", help="Input text file(s) to process") parser.add_argument( "-n", "--tokens", type=int, default=100, help="Number of tokens per chunk (default: 100)", ) parser.add_argument( "-l", "--label", type=str, help="Custom label for all chunks (defaults to filename)", ) parser.add_argument( "-o", "--output", type=str, default="output.csv", help="Output CSV filename (default: output.csv)", ) parser.add_argument( "-c", "--max-chunks", type=int, help="Maximum number of chunks to output (default: unlimited)", ) parser.add_argument( "--lemma", action="store_true", help="Use lemmatized forms of tokens instead of original text", ) args = parser.parse_args() # Load spaCy model nlp = spacy.load("en_core_web_sm") # Process files and collect chunks all_chunks = [] chunks_created = 0 for filename in args.files: if args.max_chunks and chunks_created >= args.max_chunks: break filepath = Path(filename) if not filepath.exists(): print(f"Warning: File '{filename}' not found, skipping...") continue try: with open(filepath, "r", encoding="utf-8") as f: text = f.read() except Exception as e: print(f"Error reading '{filename}': {e}") continue # Split on one or more newlines segments = re.split(r"\n+", text) # Remove empty segments segments = [seg.strip() for seg in segments if seg.strip()] # Process segments through spaCy pipe all_tokens = [] for doc in nlp.pipe(segments): # Extract tokens from each processed segment if args.lemma: tokens = [token.lemma_ for token in doc] else: tokens = [token.text for token in doc] all_tokens.extend(tokens) # Determine label label = args.label if args.label else filepath.name # Create chunks of n tokens for i in range(0, len(all_tokens), args.tokens): if args.max_chunks and chunks_created >= args.max_chunks: break chunk = all_tokens[i : i + args.tokens] # Only include chunks with exactly n tokens if len(chunk) == args.tokens: chunk_text = " ".join(chunk) all_chunks.append({"text": chunk_text, "label": label}) chunks_created += 1 # Write to CSV if all_chunks: with open(args.output, "w", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=["text", "label"]) writer.writeheader() writer.writerows(all_chunks) print(f"Successfully wrote {len(all_chunks)} chunks to '{args.output}'") if args.lemma: print("Note: Tokens were lemmatized") else: print("No valid chunks to write.") if __name__ == "__main__": main()