Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "en-core-web-sm", | |
# "spacy", | |
# ] | |
# | |
# [tool.uv.sources] | |
# en-core-web-sm = { url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl" } | |
# /// | |
#!/usr/bin/env python3 | |
import argparse | |
import csv | |
import re | |
import sys | |
from pathlib import Path | |
import spacy | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Tokenize text files and output chunked CSV" | |
) | |
parser.add_argument("files", nargs="+", help="Input text file(s) to process") | |
parser.add_argument( | |
"-n", | |
"--tokens", | |
type=int, | |
default=100, | |
help="Number of tokens per chunk (default: 100)", | |
) | |
parser.add_argument( | |
"-l", | |
"--label", | |
type=str, | |
help="Custom label for all chunks (defaults to filename)", | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
type=str, | |
default="output.csv", | |
help="Output CSV filename (default: output.csv)", | |
) | |
parser.add_argument( | |
"-c", | |
"--max-chunks", | |
type=int, | |
help="Maximum number of chunks to output (default: unlimited)", | |
) | |
parser.add_argument( | |
"--lemma", | |
action="store_true", | |
help="Use lemmatized forms of tokens instead of original text", | |
) | |
args = parser.parse_args() | |
# Load spaCy model | |
nlp = spacy.load("en_core_web_sm") | |
# Process files and collect chunks | |
all_chunks = [] | |
chunks_created = 0 | |
for filename in args.files: | |
if args.max_chunks and chunks_created >= args.max_chunks: | |
break | |
filepath = Path(filename) | |
if not filepath.exists(): | |
print(f"Warning: File '{filename}' not found, skipping...") | |
continue | |
try: | |
with open(filepath, "r", encoding="utf-8") as f: | |
text = f.read() | |
except Exception as e: | |
print(f"Error reading '{filename}': {e}") | |
continue | |
# Split on one or more newlines | |
segments = re.split(r"\n+", text) | |
# Remove empty segments | |
segments = [seg.strip() for seg in segments if seg.strip()] | |
# Process segments through spaCy pipe | |
all_tokens = [] | |
for doc in nlp.pipe(segments): | |
# Extract tokens from each processed segment | |
if args.lemma: | |
tokens = [token.lemma_ for token in doc] | |
else: | |
tokens = [token.text for token in doc] | |
all_tokens.extend(tokens) | |
# Determine label | |
label = args.label if args.label else filepath.name | |
# Create chunks of n tokens | |
for i in range(0, len(all_tokens), args.tokens): | |
if args.max_chunks and chunks_created >= args.max_chunks: | |
break | |
chunk = all_tokens[i : i + args.tokens] | |
# Only include chunks with exactly n tokens | |
if len(chunk) == args.tokens: | |
chunk_text = " ".join(chunk) | |
all_chunks.append({"text": chunk_text, "label": label}) | |
chunks_created += 1 | |
# Write to CSV | |
if all_chunks: | |
with open(args.output, "w", newline="", encoding="utf-8") as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=["text", "label"]) | |
writer.writeheader() | |
writer.writerows(all_chunks) | |
print(f"Successfully wrote {len(all_chunks)} chunks to '{args.output}'") | |
if args.lemma: | |
print("Note: Tokens were lemmatized") | |
else: | |
print("No valid chunks to write.") | |
if __name__ == "__main__": | |
main() | |