CCI_OLLAMA_CODE_CHAT / file_processing.py
jeanmarcocruz207's picture
Upload 29 files
3754f8b verified
raw
history blame
5.98 kB
import io
import json
import zipfile
from pathlib import Path
from typing import Tuple, List, Set
from config import settings
# flags opcionales
HAS_OCR = False
HAS_PDF = False
try:
from PIL import Image # noqa
import pytesseract # noqa
HAS_OCR = True
except ImportError:
HAS_OCR = False
try:
import PyPDF2 # noqa
HAS_PDF = True
except ImportError:
HAS_PDF = False
CODE_EXTS = {
".py": "python", ".js": "javascript", ".ts": "typescript",
".java": "java", ".cs": "csharp", ".php": "php",
".rb": "ruby", ".go": "go", ".rs": "rust",
".c": "c", ".cpp": "cpp", ".h": "c", ".css": "css",
".html": "html", ".htm": "html", ".sql": "sql",
".sh": "bash", ".bash": "bash", ".yml": "yaml",
".yaml": "yaml", ".json": "json", ".xml": "xml",
".md": "markdown"
}
def guess_lang_from_name(name: str):
return CODE_EXTS.get(Path(name).suffix.lower())
def guess_lang_from_content(content: str):
if not isinstance(content, str):
return None
low = content.lower()
if "def " in low or "import " in low:
return "python"
if "public class" in content or "System.out.println" in content:
return "java"
if "select " in low or "create table" in low:
return "sql"
if "function " in low and "console.log" in low:
return "javascript"
if "<html" in low:
return "html"
return None
def truncate_text(txt: str) -> str:
max_chars = settings.MAX_CHARS_PER_FILE
if len(txt) <= max_chars:
return txt
return txt[:max_chars] + "\n[... archivo recortado ...]"
def read_image_to_text(raw: bytes) -> str:
if not HAS_OCR:
return "[Funcionalidad OCR no disponible. Instala 'pytesseract' y 'tesseract-ocr']"
try:
from PIL import Image
import pytesseract
img = Image.open(io.BytesIO(raw))
text = pytesseract.image_to_string(img)
return text.strip() or "[Imagen sin texto extraíble]"
except Exception as e:
return f"[Error OCR: {e}]"
def read_pdf_to_text(raw: bytes) -> str:
if not HAS_PDF:
return "[Funcionalidad PDF no disponible. Instala 'PyPDF2']"
try:
import PyPDF2
reader = PyPDF2.PdfReader(io.BytesIO(raw))
texts = [p.extract_text() or "" for p in reader.pages]
result = "\n".join(texts).strip()
return result or "[PDF sin texto extraíble]"
except Exception as e:
return f"[Error PDF: {e}]"
def read_zip(raw: bytes, zip_name: str) -> str:
collected = []
try:
with zipfile.ZipFile(io.BytesIO(raw)) as zf:
for info in zf.infolist():
if info.is_dir():
continue
inner_name = info.filename
data = zf.read(inner_name)
ext = Path(inner_name).suffix.lower()
if ext in CODE_EXTS or ext in [".txt", ".md"]:
try:
text = data.decode("utf-8", errors="replace")
except Exception:
text = "[No decodificable]"
lang = guess_lang_from_name(inner_name) or guess_lang_from_content(text) or "text"
text = truncate_text(text)
collected.append(f"--- {inner_name} ({lang}) ---\n{text}\n")
except zipfile.BadZipFile:
return f"[Error leyendo ZIP: archivo corrupto ({zip_name})]"
except Exception as e:
return f"[Error leyendo ZIP: {e}]"
return "\n".join(collected) if collected else f"[ZIP {zip_name} sin archivos útiles]"
def read_uploaded_files(files, exclude_text: str):
if not files:
return "", "Sin archivos", ""
exclude: Set[str] = {x.strip() for x in exclude_text.splitlines() if x.strip()}
parts: List[str] = []
preview: List[str] = []
total_size = 0
first_code = ""
for f in files:
name = getattr(f, "name", "archivo")
basename = Path(name).name
if basename in exclude or name in exclude:
preview.append(f"🚫 {basename} (excluido)")
continue
try:
f.seek(0)
except Exception:
pass
raw = f.read()
file_size = len(raw)
total_size += file_size
if total_size > settings.MAX_TOTAL_UPLOAD:
preview.append("⚠️ Límite total de carga superado, se ignoró el resto.")
break
if file_size > settings.MAX_FILE_SIZE:
parts.append(f"# {basename}\n[Archivo muy grande, ignorado]\n")
preview.append(f"⚠️ {basename} (muy grande)")
continue
suffix = Path(basename).suffix.lower()
if suffix == ".zip":
content = read_zip(raw, basename)
parts.append(f"# {basename} (zip)\n{content}\n")
preview.append(f"📦 {basename}")
if not first_code and content.strip():
first_code = content[:settings.MAX_CHARS_PER_FILE]
elif suffix in [".png", ".jpg", ".jpeg", ".webp", ".bmp"]:
content = read_image_to_text(raw)
parts.append(f"# {basename} (imagen)\n{content}\n")
preview.append(f"🖼️ {basename}")
elif suffix == ".pdf":
content = read_pdf_to_text(raw)
parts.append(f"# {basename} (pdf)\n{content}\n")
preview.append(f"📄 {basename}")
else:
try:
text = raw.decode("utf-8", errors="replace")
except Exception:
text = "[No decodificable]"
text = truncate_text(text)
lang = guess_lang_from_name(basename) or guess_lang_from_content(text) or "text"
parts.append(f"# {basename} ({lang})\n{text}\n")
preview.append(f"📝 {basename} ({lang})")
if not first_code and lang != "text":
first_code = text
return "\n".join(parts), "\n".join(preview) if preview else "Sin archivos válidos", first_code