import os from dotenv import load_dotenv import uuid import matplotlib.pyplot as plt from pathlib import Path from typing import Dict, Any, List, Optional import pandas as pd import numpy as np import json import io import contextlib import traceback import time from datetime import datetime, timedelta import seaborn as sns import scipy.stats as stats from pydantic import BaseModel from supabase_service import upload_file_to_supabase # Load environment variables from .env file load_dotenv() class CodeResponse(BaseModel): """Container for code-related responses""" language: str = "python" code: str class ChartSpecification(BaseModel): """Details about requested charts""" image_description: str code: Optional[str] = None class AnalysisOperation(BaseModel): """Container for a single analysis operation with its code and result""" code: CodeResponse description: str class CsvChatResult(BaseModel): """Structured response for CSV-related AI interactions""" response_type: str # Literal["casual", "data_analysis", "visualization", "mixed"] casual_response: str analysis_operations: List[AnalysisOperation] charts: Optional[List[ChartSpecification]] = None class PythonExecutor: """Handles execution of Python code with comprehensive data analysis libraries""" def __init__(self, df: pd.DataFrame, charts_folder: str = "generated_charts"): """ Initialize the PythonExecutor with a DataFrame Args: df (pd.DataFrame): The DataFrame to operate on charts_folder (str): Folder to save charts in """ self.df = df self.charts_folder = Path(charts_folder) self.charts_folder.mkdir(exist_ok=True) def execute_code(self, code: str) -> Dict[str, Any]: """ Execute Python code with full data analysis context and return results Args: code (str): Python code to execute Returns: dict: Dictionary containing execution results and any generated plots """ output = "" error = None plots = [] # Capture stdout stdout = io.StringIO() # Monkey patch plt.show() to save figures original_show = plt.show def custom_show(): """Custom show function that saves plots instead of displaying them""" for i, fig in enumerate(plt.get_fignums()): figure = plt.figure(fig) # Save plot to bytes buffer buf = io.BytesIO() figure.savefig(buf, format='png', bbox_inches='tight') buf.seek(0) plots.append(buf.read()) plt.close('all') try: # Create comprehensive execution context with data analysis libraries exec_globals = { # Core data analysis 'pd': pd, 'np': np, 'df': self.df, # Visualization 'plt': plt, 'sns': sns, # Statistics 'stats': stats, # Date/time 'datetime': datetime, 'timedelta': timedelta, 'time': time, # Utilities 'json': json, '__builtins__': __builtins__, } # Replace plt.show with custom implementation plt.show = custom_show # Execute code and capture output with contextlib.redirect_stdout(stdout): exec(code, exec_globals) output = stdout.getvalue() except Exception as e: error = { "message": str(e), "traceback": traceback.format_exc() } finally: # Restore original plt.show plt.show = original_show return { 'output': output, 'error': error, 'plots': plots } async def save_plot_to_supabase(self, plot_data: bytes, description: str, chat_id: str) -> str: """ Save plot to Supabase storage and return the public URL Args: plot_data (bytes): Image data in bytes description (str): Description of the plot chat_id (str): ID of the chat session Returns: str: Public URL of the uploaded chart """ # Generate unique filename filename = f"chart_{uuid.uuid4().hex}.png" filepath = self.charts_folder / filename # Save the plot locally first with open(filepath, 'wb') as f: f.write(plot_data) try: # Upload to Supabase public_url = await upload_file_to_supabase( file_path=str(filepath), file_name=filename, chat_id=chat_id ) # Remove the local file after upload os.remove(filepath) return public_url except Exception as e: # Clean up local file if upload fails if os.path.exists(filepath): os.remove(filepath) raise Exception(f"Failed to upload plot to Supabase: {e}") def _looks_like_structured_data(self, output: str) -> bool: """Helper to detect JSON-like or array-like output""" output = output.strip() return ( output.startswith('{') and output.endswith('}') or # JSON object output.startswith('[') and output.endswith(']') or # Array '\n' in output and '=' in output # Python console output ) async def process_response(self, response: CsvChatResult, chat_id: str) -> str: """ Process the CsvChatResult response and generate formatted output with markdown code blocks for structured data. Args: response (CsvChatResult): Response from CSV analysis chat_id (str): ID of the chat session Returns: str: Formatted output with results and image URLs """ output_parts = [] # Add casual response output_parts.append(response.casual_response) # Process analysis operations for operation in response.analysis_operations: # Execute the code result = self.execute_code(operation.code.code) # Add operation description output_parts.append(f"\n{operation.description}:") # Add output or error with markdown wrapping if result['error']: output_parts.append("```python\n" + f"Error: {result['error']['message']}" + "\n```") else: output = result['output'].strip() if self._looks_like_structured_data(output): output_parts.append("```python\n" + output + "\n```") else: output_parts.append(output) # Process charts if response.charts: output_parts.append("\nVisualizations:") for chart in response.charts: if chart.code: result = self.execute_code(chart.code) if result['plots']: for plot_data in result['plots']: try: public_url = await self.save_plot_to_supabase( plot_data=plot_data, description=chart.image_description, chat_id=chat_id ) output_parts.append(f"\n{chart.image_description}") output_parts.append(f"![{chart.image_description}]({public_url})") except Exception as e: output_parts.append(f"\nError uploading chart: {str(e)}") elif result['error']: output_parts.append("```python\n" + f"Error generating {chart.image_description}: {result['error']['message']}" + "\n```") return "\n".join(output_parts) # Table formatter # import os # from dotenv import load_dotenv # import uuid # import matplotlib.pyplot as plt # from pathlib import Path # from typing import Dict, Any, List, Optional # import pandas as pd # import numpy as np # import json # import io # import contextlib # import traceback # import time # from datetime import datetime, timedelta # import seaborn as sns # import scipy.stats as stats # from pydantic import BaseModel # from supabase_service import upload_file_to_supabase # # Load environment variables from .env file # load_dotenv() # class CodeResponse(BaseModel): # """Container for code-related responses""" # language: str = "python" # code: str # class ChartSpecification(BaseModel): # """Details about requested charts""" # image_description: str # code: Optional[str] = None # class AnalysisOperation(BaseModel): # """Container for a single analysis operation with its code and result""" # code: CodeResponse # description: str # class CsvChatResult(BaseModel): # """Structured response for CSV-related AI interactions""" # response_type: str # Literal["casual", "data_analysis", "visualization", "mixed"] # casual_response: str # analysis_operations: List[AnalysisOperation] # charts: Optional[List[ChartSpecification]] = None # class PythonExecutor: # """Handles execution of Python code with comprehensive data analysis libraries""" # def __init__(self, df: pd.DataFrame, charts_folder: str = "generated_charts"): # """ # Initialize the PythonExecutor with a DataFrame # Args: # df (pd.DataFrame): The DataFrame to operate on # charts_folder (str): Folder to save charts in # """ # self.df = df # self.charts_folder = Path(charts_folder) # self.charts_folder.mkdir(exist_ok=True) # def execute_code(self, code: str) -> Dict[str, Any]: # """ # Execute Python code with full data analysis context and return results # Args: # code (str): Python code to execute # Returns: # dict: Dictionary containing execution results and any generated plots # """ # output = "" # error = None # plots = [] # # Capture stdout # stdout = io.StringIO() # # Monkey patch plt.show() to save figures # original_show = plt.show # def custom_show(): # """Custom show function that saves plots instead of displaying them""" # for i, fig in enumerate(plt.get_fignums()): # figure = plt.figure(fig) # # Save plot to bytes buffer # buf = io.BytesIO() # figure.savefig(buf, format='png', bbox_inches='tight') # buf.seek(0) # plots.append(buf.read()) # plt.close('all') # try: # # Create comprehensive execution context with data analysis libraries # exec_globals = { # # Core data analysis # 'pd': pd, # 'np': np, # 'df': self.df, # # Visualization # 'plt': plt, # 'sns': sns, # # Statistics # 'stats': stats, # # Date/time # 'datetime': datetime, # 'timedelta': timedelta, # 'time': time, # # Utilities # 'json': json, # '__builtins__': __builtins__, # } # # Replace plt.show with custom implementation # plt.show = custom_show # # Execute code and capture output # with contextlib.redirect_stdout(stdout): # exec(code, exec_globals) # output = stdout.getvalue() # except Exception as e: # error = { # "message": str(e), # "traceback": traceback.format_exc() # } # finally: # # Restore original plt.show # plt.show = original_show # return { # 'output': output, # 'error': error, # 'plots': plots # } # def _convert_dataframe_to_text(self, df: pd.DataFrame) -> str: # """ # Convert pandas DataFrame to a text format that can be easily rendered # in the frontend using the ScrollableTableRenderer component. # Args: # df (pd.DataFrame): DataFrame to convert # Returns: # str: Text representation of the DataFrame # """ # # Convert DataFrame to string with proper formatting # df_str = df.to_string(index=True) # # Split into lines and clean up # lines = df_str.split('\n') # # Remove any trailing whitespace from each line # cleaned_lines = [line.rstrip() for line in lines] # # Join back with newlines # return '\n'.join(cleaned_lines) # async def save_plot_to_supabase(self, plot_data: bytes, description: str, chat_id: str) -> str: # """ # Save plot to Supabase storage and return the public URL # Args: # plot_data (bytes): Image data in bytes # description (str): Description of the plot # chat_id (str): ID of the chat session # Returns: # str: Public URL of the uploaded chart # """ # # Generate unique filename # filename = f"chart_{uuid.uuid4().hex}.png" # filepath = self.charts_folder / filename # # Save the plot locally first # with open(filepath, 'wb') as f: # f.write(plot_data) # try: # # Upload to Supabase # public_url = await upload_file_to_supabase( # file_path=str(filepath), # file_name=filename, # chat_id=chat_id # ) # # Remove the local file after upload # os.remove(filepath) # return public_url # except Exception as e: # # Clean up local file if upload fails # if os.path.exists(filepath): # os.remove(filepath) # raise Exception(f"Failed to upload plot to Supabase: {e}") # def _looks_like_structured_data(self, output: str) -> bool: # """Helper to detect JSON-like or array-like output""" # output = output.strip() # return ( # output.startswith('{') and output.endswith('}') or # JSON object # output.startswith('[') and output.endswith(']') or # Array # '\n' in output and '=' in output # Python console output # ) # def _is_dataframe_output(self, output: str) -> bool: # """Helper to detect if output looks like a pandas DataFrame""" # lines = output.strip().split('\n') # if len(lines) < 2: # return False # # Check for typical DataFrame header pattern # first_line = lines[0].strip() # second_line = lines[1].strip() # # Look for column headers and separator line # if not first_line or not second_line: # return False # # Check if the first line contains column names # # and the second line has some alignment characters # return True # async def process_response(self, response: CsvChatResult, chat_id: str) -> str: # """ # Process the CsvChatResult response and generate formatted output # with markdown code blocks for structured data. # Args: # response (CsvChatResult): Response from CSV analysis # chat_id (str): ID of the chat session # Returns: # str: Formatted output with results and image URLs # """ # output_parts = [] # # Add casual response # output_parts.append(response.casual_response) # # Process analysis operations # for operation in response.analysis_operations: # # Execute the code # result = self.execute_code(operation.code.code) # # Add operation description # output_parts.append(f"\n{operation.description}:") # # Add output or error with markdown wrapping # if result['error']: # output_parts.append("```python\n" + f"Error: {result['error']['message']}" + "\n```") # else: # output = result['output'].strip() # # Check if the output is a DataFrame-like structure # if self._is_dataframe_output(output): # # Convert to a clean text format for frontend rendering # try: # # Get the last evaluated expression which might be the DataFrame # # This is a simple approach - in practice you might need a more robust way # # to capture the actual DataFrame from the execution context # df_output = self._convert_dataframe_to_text(eval(operation.code.code.split('\n')[-1], globals(), locals())) # output_parts.append("```text\n" + df_output + "\n```") # except: # # Fall back to regular output if we can't convert it # output_parts.append("```text\n" + output + "\n```") # elif self._looks_like_structured_data(output): # output_parts.append("```python\n" + output + "\n```") # else: # output_parts.append(output) # # Process charts # if response.charts: # output_parts.append("\nVisualizations:") # for chart in response.charts: # if chart.code: # result = self.execute_code(chart.code) # if result['plots']: # for plot_data in result['plots']: # try: # public_url = await self.save_plot_to_supabase( # plot_data=plot_data, # description=chart.image_description, # chat_id=chat_id # ) # output_parts.append(f"\n{chart.image_description}") # output_parts.append(f"![{chart.image_description}]({public_url})") # except Exception as e: # output_parts.append(f"\nError uploading chart: {str(e)}") # elif result['error']: # output_parts.append("```python\n" + f"Error generating {chart.image_description}: {result['error']['message']}" + "\n```") # return "\n".join(output_parts)