import os | |
from dotenv import load_dotenv | |
import uuid | |
import matplotlib.pyplot as plt | |
from pathlib import Path | |
from typing import Dict, Any, List, Optional | |
import pandas as pd | |
import numpy as np | |
import json | |
import io | |
import contextlib | |
import traceback | |
import time | |
from datetime import datetime, timedelta | |
import seaborn as sns | |
import scipy.stats as stats | |
from pydantic import BaseModel | |
from supabase_service import upload_file_to_supabase | |
# Load environment variables from .env file | |
load_dotenv() | |
class CodeResponse(BaseModel): | |
"""Container for code-related responses""" | |
language: str = "python" | |
code: str | |
class ChartSpecification(BaseModel): | |
"""Details about requested charts""" | |
image_description: str | |
code: Optional[str] = None | |
class AnalysisOperation(BaseModel): | |
"""Container for a single analysis operation with its code and result""" | |
code: CodeResponse | |
description: str | |
class CsvChatResult(BaseModel): | |
"""Structured response for CSV-related AI interactions""" | |
response_type: str # Literal["casual", "data_analysis", "visualization", "mixed"] | |
casual_response: str | |
analysis_operations: List[AnalysisOperation] | |
charts: Optional[List[ChartSpecification]] = None | |
class PythonExecutor: | |
"""Handles execution of Python code with comprehensive data analysis libraries""" | |
def __init__(self, df: pd.DataFrame, charts_folder: str = "generated_charts"): | |
""" | |
Initialize the PythonExecutor with a DataFrame | |
Args: | |
df (pd.DataFrame): The DataFrame to operate on | |
charts_folder (str): Folder to save charts in | |
""" | |
self.df = df | |
self.charts_folder = Path(charts_folder) | |
self.charts_folder.mkdir(exist_ok=True) | |
def execute_code(self, code: str) -> Dict[str, Any]: | |
""" | |
Execute Python code with full data analysis context and return results | |
Args: | |
code (str): Python code to execute | |
Returns: | |
dict: Dictionary containing execution results and any generated plots | |
""" | |
output = "" | |
error = None | |
plots = [] | |
# Capture stdout | |
stdout = io.StringIO() | |
# Monkey patch plt.show() to save figures | |
original_show = plt.show | |
def custom_show(): | |
"""Custom show function that saves plots instead of displaying them""" | |
for i, fig in enumerate(plt.get_fignums()): | |
figure = plt.figure(fig) | |
# Save plot to bytes buffer | |
buf = io.BytesIO() | |
figure.savefig(buf, format='png', bbox_inches='tight') | |
buf.seek(0) | |
plots.append(buf.read()) | |
plt.close('all') | |
try: | |
# Create comprehensive execution context with data analysis libraries | |
exec_globals = { | |
# Core data analysis | |
'pd': pd, | |
'np': np, | |
'df': self.df, | |
# Visualization | |
'plt': plt, | |
'sns': sns, | |
# Statistics | |
'stats': stats, | |
# Date/time | |
'datetime': datetime, | |
'timedelta': timedelta, | |
'time': time, | |
# Utilities | |
'json': json, | |
'__builtins__': __builtins__, | |
} | |
# Replace plt.show with custom implementation | |
plt.show = custom_show | |
# Execute code and capture output | |
with contextlib.redirect_stdout(stdout): | |
exec(code, exec_globals) | |
output = stdout.getvalue() | |
except Exception as e: | |
error = { | |
"message": str(e), | |
"traceback": traceback.format_exc() | |
} | |
finally: | |
# Restore original plt.show | |
plt.show = original_show | |
return { | |
'output': output, | |
'error': error, | |
'plots': plots | |
} | |
async def save_plot_to_supabase(self, plot_data: bytes, description: str, chat_id: str) -> str: | |
""" | |
Save plot to Supabase storage and return the public URL | |
Args: | |
plot_data (bytes): Image data in bytes | |
description (str): Description of the plot | |
chat_id (str): ID of the chat session | |
Returns: | |
str: Public URL of the uploaded chart | |
""" | |
# Generate unique filename | |
filename = f"chart_{uuid.uuid4().hex}.png" | |
filepath = self.charts_folder / filename | |
# Save the plot locally first | |
with open(filepath, 'wb') as f: | |
f.write(plot_data) | |
try: | |
# Upload to Supabase | |
public_url = await upload_file_to_supabase( | |
file_path=str(filepath), | |
file_name=filename, | |
chat_id=chat_id | |
) | |
# Remove the local file after upload | |
os.remove(filepath) | |
return public_url | |
except Exception as e: | |
# Clean up local file if upload fails | |
if os.path.exists(filepath): | |
os.remove(filepath) | |
raise Exception(f"Failed to upload plot to Supabase: {e}") | |
def _looks_like_structured_data(self, output: str) -> bool: | |
"""Helper to detect JSON-like or array-like output""" | |
output = output.strip() | |
return ( | |
output.startswith('{') and output.endswith('}') or # JSON object | |
output.startswith('[') and output.endswith(']') or # Array | |
'\n' in output and '=' in output # Python console output | |
) | |
async def process_response(self, response: CsvChatResult, chat_id: str) -> str: | |
""" | |
Process the CsvChatResult response and generate formatted output | |
with markdown code blocks for structured data. | |
Args: | |
response (CsvChatResult): Response from CSV analysis | |
chat_id (str): ID of the chat session | |
Returns: | |
str: Formatted output with results and image URLs | |
""" | |
output_parts = [] | |
# Add casual response | |
output_parts.append(response.casual_response) | |
# Process analysis operations | |
for operation in response.analysis_operations: | |
# Execute the code | |
result = self.execute_code(operation.code.code) | |
# Add operation description | |
output_parts.append(f"\n{operation.description}:") | |
# Add output or error with markdown wrapping | |
if result['error']: | |
output_parts.append("```python\n" + f"Error: {result['error']['message']}" + "\n```") | |
else: | |
output = result['output'].strip() | |
if self._looks_like_structured_data(output): | |
output_parts.append("```python\n" + output + "\n```") | |
else: | |
output_parts.append(output) | |
# Process charts | |
if response.charts: | |
output_parts.append("\nVisualizations:") | |
for chart in response.charts: | |
if chart.code: | |
result = self.execute_code(chart.code) | |
if result['plots']: | |
for plot_data in result['plots']: | |
try: | |
public_url = await self.save_plot_to_supabase( | |
plot_data=plot_data, | |
description=chart.image_description, | |
chat_id=chat_id | |
) | |
output_parts.append(f"\n{chart.image_description}") | |
output_parts.append(f"") | |
except Exception as e: | |
output_parts.append(f"\nError uploading chart: {str(e)}") | |
elif result['error']: | |
output_parts.append("```python\n" + f"Error generating {chart.image_description}: {result['error']['message']}" + "\n```") | |
return "\n".join(output_parts) | |
# Table formatter | |
# import os | |
# from dotenv import load_dotenv | |
# import uuid | |
# import matplotlib.pyplot as plt | |
# from pathlib import Path | |
# from typing import Dict, Any, List, Optional | |
# import pandas as pd | |
# import numpy as np | |
# import json | |
# import io | |
# import contextlib | |
# import traceback | |
# import time | |
# from datetime import datetime, timedelta | |
# import seaborn as sns | |
# import scipy.stats as stats | |
# from pydantic import BaseModel | |
# from supabase_service import upload_file_to_supabase | |
# # Load environment variables from .env file | |
# load_dotenv() | |
# class CodeResponse(BaseModel): | |
# """Container for code-related responses""" | |
# language: str = "python" | |
# code: str | |
# class ChartSpecification(BaseModel): | |
# """Details about requested charts""" | |
# image_description: str | |
# code: Optional[str] = None | |
# class AnalysisOperation(BaseModel): | |
# """Container for a single analysis operation with its code and result""" | |
# code: CodeResponse | |
# description: str | |
# class CsvChatResult(BaseModel): | |
# """Structured response for CSV-related AI interactions""" | |
# response_type: str # Literal["casual", "data_analysis", "visualization", "mixed"] | |
# casual_response: str | |
# analysis_operations: List[AnalysisOperation] | |
# charts: Optional[List[ChartSpecification]] = None | |
# class PythonExecutor: | |
# """Handles execution of Python code with comprehensive data analysis libraries""" | |
# def __init__(self, df: pd.DataFrame, charts_folder: str = "generated_charts"): | |
# """ | |
# Initialize the PythonExecutor with a DataFrame | |
# Args: | |
# df (pd.DataFrame): The DataFrame to operate on | |
# charts_folder (str): Folder to save charts in | |
# """ | |
# self.df = df | |
# self.charts_folder = Path(charts_folder) | |
# self.charts_folder.mkdir(exist_ok=True) | |
# def execute_code(self, code: str) -> Dict[str, Any]: | |
# """ | |
# Execute Python code with full data analysis context and return results | |
# Args: | |
# code (str): Python code to execute | |
# Returns: | |
# dict: Dictionary containing execution results and any generated plots | |
# """ | |
# output = "" | |
# error = None | |
# plots = [] | |
# # Capture stdout | |
# stdout = io.StringIO() | |
# # Monkey patch plt.show() to save figures | |
# original_show = plt.show | |
# def custom_show(): | |
# """Custom show function that saves plots instead of displaying them""" | |
# for i, fig in enumerate(plt.get_fignums()): | |
# figure = plt.figure(fig) | |
# # Save plot to bytes buffer | |
# buf = io.BytesIO() | |
# figure.savefig(buf, format='png', bbox_inches='tight') | |
# buf.seek(0) | |
# plots.append(buf.read()) | |
# plt.close('all') | |
# try: | |
# # Create comprehensive execution context with data analysis libraries | |
# exec_globals = { | |
# # Core data analysis | |
# 'pd': pd, | |
# 'np': np, | |
# 'df': self.df, | |
# # Visualization | |
# 'plt': plt, | |
# 'sns': sns, | |
# # Statistics | |
# 'stats': stats, | |
# # Date/time | |
# 'datetime': datetime, | |
# 'timedelta': timedelta, | |
# 'time': time, | |
# # Utilities | |
# 'json': json, | |
# '__builtins__': __builtins__, | |
# } | |
# # Replace plt.show with custom implementation | |
# plt.show = custom_show | |
# # Execute code and capture output | |
# with contextlib.redirect_stdout(stdout): | |
# exec(code, exec_globals) | |
# output = stdout.getvalue() | |
# except Exception as e: | |
# error = { | |
# "message": str(e), | |
# "traceback": traceback.format_exc() | |
# } | |
# finally: | |
# # Restore original plt.show | |
# plt.show = original_show | |
# return { | |
# 'output': output, | |
# 'error': error, | |
# 'plots': plots | |
# } | |
# def _convert_dataframe_to_text(self, df: pd.DataFrame) -> str: | |
# """ | |
# Convert pandas DataFrame to a text format that can be easily rendered | |
# in the frontend using the ScrollableTableRenderer component. | |
# Args: | |
# df (pd.DataFrame): DataFrame to convert | |
# Returns: | |
# str: Text representation of the DataFrame | |
# """ | |
# # Convert DataFrame to string with proper formatting | |
# df_str = df.to_string(index=True) | |
# # Split into lines and clean up | |
# lines = df_str.split('\n') | |
# # Remove any trailing whitespace from each line | |
# cleaned_lines = [line.rstrip() for line in lines] | |
# # Join back with newlines | |
# return '\n'.join(cleaned_lines) | |
# async def save_plot_to_supabase(self, plot_data: bytes, description: str, chat_id: str) -> str: | |
# """ | |
# Save plot to Supabase storage and return the public URL | |
# Args: | |
# plot_data (bytes): Image data in bytes | |
# description (str): Description of the plot | |
# chat_id (str): ID of the chat session | |
# Returns: | |
# str: Public URL of the uploaded chart | |
# """ | |
# # Generate unique filename | |
# filename = f"chart_{uuid.uuid4().hex}.png" | |
# filepath = self.charts_folder / filename | |
# # Save the plot locally first | |
# with open(filepath, 'wb') as f: | |
# f.write(plot_data) | |
# try: | |
# # Upload to Supabase | |
# public_url = await upload_file_to_supabase( | |
# file_path=str(filepath), | |
# file_name=filename, | |
# chat_id=chat_id | |
# ) | |
# # Remove the local file after upload | |
# os.remove(filepath) | |
# return public_url | |
# except Exception as e: | |
# # Clean up local file if upload fails | |
# if os.path.exists(filepath): | |
# os.remove(filepath) | |
# raise Exception(f"Failed to upload plot to Supabase: {e}") | |
# def _looks_like_structured_data(self, output: str) -> bool: | |
# """Helper to detect JSON-like or array-like output""" | |
# output = output.strip() | |
# return ( | |
# output.startswith('{') and output.endswith('}') or # JSON object | |
# output.startswith('[') and output.endswith(']') or # Array | |
# '\n' in output and '=' in output # Python console output | |
# ) | |
# def _is_dataframe_output(self, output: str) -> bool: | |
# """Helper to detect if output looks like a pandas DataFrame""" | |
# lines = output.strip().split('\n') | |
# if len(lines) < 2: | |
# return False | |
# # Check for typical DataFrame header pattern | |
# first_line = lines[0].strip() | |
# second_line = lines[1].strip() | |
# # Look for column headers and separator line | |
# if not first_line or not second_line: | |
# return False | |
# # Check if the first line contains column names | |
# # and the second line has some alignment characters | |
# return True | |
# async def process_response(self, response: CsvChatResult, chat_id: str) -> str: | |
# """ | |
# Process the CsvChatResult response and generate formatted output | |
# with markdown code blocks for structured data. | |
# Args: | |
# response (CsvChatResult): Response from CSV analysis | |
# chat_id (str): ID of the chat session | |
# Returns: | |
# str: Formatted output with results and image URLs | |
# """ | |
# output_parts = [] | |
# # Add casual response | |
# output_parts.append(response.casual_response) | |
# # Process analysis operations | |
# for operation in response.analysis_operations: | |
# # Execute the code | |
# result = self.execute_code(operation.code.code) | |
# # Add operation description | |
# output_parts.append(f"\n{operation.description}:") | |
# # Add output or error with markdown wrapping | |
# if result['error']: | |
# output_parts.append("```python\n" + f"Error: {result['error']['message']}" + "\n```") | |
# else: | |
# output = result['output'].strip() | |
# # Check if the output is a DataFrame-like structure | |
# if self._is_dataframe_output(output): | |
# # Convert to a clean text format for frontend rendering | |
# try: | |
# # Get the last evaluated expression which might be the DataFrame | |
# # This is a simple approach - in practice you might need a more robust way | |
# # to capture the actual DataFrame from the execution context | |
# df_output = self._convert_dataframe_to_text(eval(operation.code.code.split('\n')[-1], globals(), locals())) | |
# output_parts.append("```text\n" + df_output + "\n```") | |
# except: | |
# # Fall back to regular output if we can't convert it | |
# output_parts.append("```text\n" + output + "\n```") | |
# elif self._looks_like_structured_data(output): | |
# output_parts.append("```python\n" + output + "\n```") | |
# else: | |
# output_parts.append(output) | |
# # Process charts | |
# if response.charts: | |
# output_parts.append("\nVisualizations:") | |
# for chart in response.charts: | |
# if chart.code: | |
# result = self.execute_code(chart.code) | |
# if result['plots']: | |
# for plot_data in result['plots']: | |
# try: | |
# public_url = await self.save_plot_to_supabase( | |
# plot_data=plot_data, | |
# description=chart.image_description, | |
# chat_id=chat_id | |
# ) | |
# output_parts.append(f"\n{chart.image_description}") | |
# output_parts.append(f"") | |
# except Exception as e: | |
# output_parts.append(f"\nError uploading chart: {str(e)}") | |
# elif result['error']: | |
# output_parts.append("```python\n" + f"Error generating {chart.image_description}: {result['error']['message']}" + "\n```") | |
# return "\n".join(output_parts) |