Spaces:
Starting
Starting
import logging | |
import os | |
import pandas as pd | |
import PyPDF2 | |
import speech_recognition as sr | |
import re | |
from langchain_core.tools import StructuredTool | |
from pydantic import BaseModel, Field | |
from typing import Optional | |
logger = logging.getLogger(__name__) | |
class FileParserInput(BaseModel): | |
task_id: str = Field(description="Task identifier") | |
file_type: str = Field(description="File extension (e.g., pdf, csv)") | |
file_path: str = Field(description="Path to the file") | |
query: Optional[str] = Field(description="Query related to the file", default=None) | |
async def file_parser_func(task_id: str, file_type: str, file_path: str, query: Optional[str] = None) -> str: | |
""" | |
Parse a file based on task_id, file_type, file_path, and query context. | |
Args: | |
task_id (str): Task identifier. | |
file_type (str): File extension (e.g., 'xlsx', 'mp3', 'pdf'). | |
file_path (str): Path to the file. | |
query (Optional[str]): Question context to guide parsing (e.g., for specific data extraction). | |
Returns: | |
str: Parsed content or error message. | |
""" | |
try: | |
if not os.path.exists(file_path): | |
logger.warning(f"File not found: {file_path}") | |
return "File not found" | |
logger.info(f"Parsing file: {file_path} for task {task_id}") | |
if file_type in ["xlsx", "xls"]: | |
df = pd.read_excel(file_path, engine="openpyxl") | |
if query and ("sum" in query.lower() or "total" in query.lower()): | |
numerical_cols = df.select_dtypes(include=['float64', 'int64']).columns | |
if numerical_cols.empty: | |
return "No numerical data found" | |
if "food" in query.lower(): | |
food_rows = df[df.apply(lambda x: "food" in str(x).lower(), axis=1)] | |
if not food_rows.empty and numerical_cols[0] in food_rows: | |
total = food_rows[numerical_cols[0]].sum() | |
return f"{total:.2f}" | |
total = df[numerical_cols[0]].sum() | |
return f"{total:.2f}" | |
return df.to_string(index=False) | |
elif file_type == "csv": | |
df = pd.read_csv(file_path) | |
if query and ("sum" in query.lower() or "total" in query.lower()): | |
numerical_cols = df.select_dtypes(include=['float64', 'int64']).columns | |
if numerical_cols.empty: | |
return "No numerical data found" | |
total = df[numerical_cols[0]].sum() | |
return f"{total:.2f}" | |
return df.to_string(index=False) | |
elif file_type == "pdf": | |
with open(file_path, "rb") as f: | |
reader = PyPDF2.PdfReader(f) | |
text = "".join(page.extract_text() or "" for page in reader.pages) | |
if query and "page number" in query.lower(): | |
pages = re.findall(r'\b\d+\b', text) | |
return ", ".join(sorted(pages, key=int)) if pages else "No page numbers found" | |
return text.strip() or "No text extracted" | |
elif file_type == "txt": | |
with open(file_path, "r", encoding="utf-8") as f: | |
text = f.read() | |
if query and "page number" in query.lower(): | |
pages = re.findall(r'\b\d+\b', text) | |
return ", ".join(sorted(pages, key=int)) if pages else "No page numbers found" | |
return text.strip() | |
elif file_type == "mp3": | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(file_path) as source: | |
audio = recognizer.record(source) | |
try: | |
text = recognizer.recognize_google(audio) | |
logger.debug(f"Transcribed audio: {text}") | |
if query and "page number" in query.lower(): | |
pages = re.findall(r'\b\d+\b', text) | |
return ", ".join(sorted(pages, key=int)) if pages else "No page numbers provided" | |
return text | |
except sr.UnknownValueError: | |
logger.error("Could not understand audio") | |
return "No text transcribed from audio" | |
except Exception as e: | |
logger.error(f"Audio parsing failed: {e}") | |
return "Error transcribing audio" | |
else: | |
logger.warning(f"Unsupported file type: {file_type}") | |
return f"Unsupported file type: {file_type}" | |
except Exception as e: | |
logger.error(f"Error parsing file for task {task_id}: {e}") | |
return f"Error: {str(e)}" | |
file_parser_tool = StructuredTool.from_function( | |
func=file_parser_func, | |
name="file_parser_tool", | |
args_schema=FileParserInput, | |
coroutine=file_parser_func | |
) |