|
""" |
|
Data Analysis Platform |
|
Copyright (c) 2025 JEAN YOUNG |
|
All rights reserved. |
|
|
|
This software is proprietary and confidential. |
|
Unauthorized copying, distribution, or use is prohibited. |
|
""" |
|
import streamlit as st |
|
import pandas as pd |
|
import numpy as np |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
from typing import Dict, List, Any, Optional |
|
import os |
|
from dotenv import load_dotenv |
|
from data_handler import * |
|
from io import BytesIO |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
try: |
|
import openai |
|
OPENAI_AVAILABLE = True |
|
except ImportError: |
|
OPENAI_AVAILABLE = False |
|
|
|
try: |
|
import google.generativeai as genai |
|
GEMINI_AVAILABLE = True |
|
except ImportError: |
|
GEMINI_AVAILABLE = False |
|
|
|
class AIAssistant: |
|
"""AI-powered analysis assistant""" |
|
|
|
def __init__(self): |
|
self.openai_key = os.getenv('OPENAI_API_KEY') |
|
self.gemini_key = os.getenv('GOOGLE_API_KEY') |
|
|
|
if self.gemini_key and GEMINI_AVAILABLE: |
|
genai.configure(api_key=self.gemini_key) |
|
self.gemini_model = genai.GenerativeModel('gemini-1.5-flash') |
|
|
|
def get_available_models(self) -> List[str]: |
|
"""Get list of available AI models""" |
|
models = [] |
|
if self.openai_key and OPENAI_AVAILABLE: |
|
models.append("OpenAI GPT") |
|
if self.gemini_key and GEMINI_AVAILABLE: |
|
models.append("Google Gemini") |
|
return models |
|
|
|
def analyze_insights(self, df: pd.DataFrame, insights: List[Dict], model: str = "Google Gemini") -> str: |
|
"""Get AI analysis of insights""" |
|
|
|
|
|
summary = f""" |
|
Dataset Summary: |
|
- Shape: {df.shape} |
|
- Columns: {list(df.columns)} |
|
- Data types: {df.dtypes.value_counts().to_dict()} |
|
|
|
Key Insights Found: |
|
""" |
|
|
|
for insight in insights: |
|
summary += f"\n- {insight['insight']}" |
|
|
|
prompt = f""" |
|
As a senior data scientist, analyze this dataset and provide: |
|
|
|
1. Business implications of the findings |
|
2. Potential opportunities or risks |
|
3. Recommendations for decision-making |
|
4. Suggestions for further analysis |
|
|
|
{summary} |
|
|
|
Provide actionable insights in a professional format. |
|
""" |
|
|
|
try: |
|
if model == "Google Gemini" and hasattr(self, 'gemini_model'): |
|
response = self.gemini_model.generate_content(prompt) |
|
return response.text |
|
elif model == "OpenAI GPT" and self.openai_key: |
|
client = openai.OpenAI(api_key=self.openai_key) |
|
response = client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": prompt}] |
|
) |
|
return response.choices[0].message.content |
|
else: |
|
return "AI analysis not available. Please configure API keys." |
|
except Exception as e: |
|
return f"AI Analysis Error: {str(e)}" |
|
|
|
class DataAnalysisWorkflow: |
|
"""Optimized data analysis workflow with caching and pagination""" |
|
|
|
def __init__(self, df: pd.DataFrame): |
|
self.df = df |
|
self.stats = calculate_basic_stats(df) |
|
self.column_types = get_column_types(df) |
|
self.insights = [] |
|
self.page_size = 1000 |
|
|
|
def add_insight(self, insight: str, stage: int): |
|
"""Add insight to analysis report""" |
|
self.insights.append({ |
|
'stage': stage, |
|
'insight': insight, |
|
'timestamp': pd.Timestamp.now() |
|
}) |
|
|
|
def get_paginated_data(self, page: int = 0) -> pd.DataFrame: |
|
"""Get paginated data for display""" |
|
start_idx = page * self.page_size |
|
end_idx = start_idx + self.page_size |
|
return self.df.iloc[start_idx:end_idx] |
|
|
|
def stage_1_overview(self): |
|
"""Stage 1: Data Overview with caching""" |
|
st.subheader("π Data Overview") |
|
|
|
|
|
quality_metrics = calculate_data_quality_score(self.df) |
|
col1, col2, col3, col4 = st.columns(4) |
|
with col1: |
|
st.metric("Rows", f"{self.stats['shape'][0]:,}") |
|
with col2: |
|
st.metric("Columns", f"{self.stats['shape'][1]:,}") |
|
with col3: |
|
st.metric("Quality Score", f"{quality_metrics['score']:.1f}/100") |
|
with col4: |
|
st.metric("Grade", quality_metrics['grade']) |
|
|
|
if quality_metrics['issues']: |
|
st.warning("Quality Issues Found:") |
|
for issue in quality_metrics['issues']: |
|
st.write(f"β’ {issue}") |
|
|
|
|
|
st.subheader("Memory Analysis") |
|
memory_opt = calculate_memory_optimization(self.df) |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.metric("Current Memory", f"{memory_opt['current_memory_mb']:.1f} MB") |
|
with col2: |
|
if memory_opt['potential_savings_mb'] > 0: |
|
st.metric("Potential Savings", |
|
f"{memory_opt['potential_savings_mb']:.1f} MB", |
|
f"{memory_opt['potential_savings_pct']:.1f}%") |
|
|
|
if st.button("Show Optimization Details"): |
|
st.dataframe(pd.DataFrame(memory_opt['suggestions'])) |
|
|
|
|
|
st.subheader("Column Cardinality Analysis") |
|
cardinality_df = calculate_column_cardinality(self.df) |
|
|
|
|
|
col_types = cardinality_df['Type'].unique() |
|
selected_types = st.multiselect("Filter by Column Type", |
|
col_types, |
|
default=col_types) |
|
|
|
filtered_df = cardinality_df[cardinality_df['Type'].isin(selected_types)] |
|
st.dataframe(filtered_df, use_container_width=True) |
|
|
|
|
|
id_cols = filtered_df[filtered_df['Type'] == 'Unique Identifier']['Column'].tolist() |
|
if id_cols: |
|
st.info(f"π Potential ID columns found: {', '.join(id_cols)}") |
|
|
|
const_cols = filtered_df[filtered_df['Type'] == 'Constant']['Column'].tolist() |
|
if const_cols: |
|
st.warning(f"β οΈ Constant columns found: {', '.join(const_cols)}") |
|
|
|
|
|
if self.stats['dtypes']: |
|
st.subheader("Data Types Distribution") |
|
fig = px.pie(values=list(self.stats['dtypes'].values()), |
|
names=list(self.stats['dtypes'].keys()), |
|
title="Data Types") |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
|
|
st.subheader("Sample Data") |
|
total_pages = (len(self.df) - 1) // self.page_size + 1 |
|
|
|
if total_pages > 1: |
|
page = st.slider("Page", 0, total_pages - 1, 0) |
|
sample_data = self.get_paginated_data(page) |
|
st.write(f"Showing rows {page * self.page_size + 1} to {min((page + 1) * self.page_size, len(self.df))}") |
|
else: |
|
sample_data = self.df.head(10) |
|
|
|
st.dataframe(sample_data, use_container_width=True) |
|
|
|
|
|
missing_df = calculate_missing_data(self.df) |
|
if not missing_df.empty: |
|
st.subheader("Missing Values Analysis") |
|
st.dataframe(missing_df, use_container_width=True) |
|
|
|
worst_column = missing_df.iloc[0]['Column'] |
|
worst_percentage = missing_df.iloc[0]['Missing %'] |
|
self.add_insight(f"Column '{worst_column}' has highest missing data: {worst_percentage:.1f}%", 1) |
|
else: |
|
st.success("β
No missing values found!") |
|
self.add_insight("Dataset has no missing values - excellent data quality", 1) |
|
|
|
|
|
if quality_metrics['score'] < 80: |
|
self.add_insight(f"Data quality needs improvement (Score: {quality_metrics['score']:.1f}/100)", 1) |
|
|
|
if memory_opt['potential_savings_pct'] > 20: |
|
self.add_insight(f"Potential memory optimization of {memory_opt['potential_savings_pct']:.1f}% identified", 1) |
|
|
|
if id_cols: |
|
self.add_insight(f"Found {len(id_cols)} potential ID columns", 1) |
|
|
|
def stage_2_exploration(self): |
|
"""Stage 2: Exploratory Data Analysis with caching""" |
|
st.subheader("π Exploratory Data Analysis") |
|
|
|
numeric_cols = self.column_types['numeric'] |
|
categorical_cols = self.column_types['categorical'] |
|
|
|
|
|
if numeric_cols: |
|
st.subheader("Numeric Variables") |
|
selected_numeric = st.selectbox("Select numeric column:", numeric_cols) |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
fig = px.histogram(self.df, x=selected_numeric, |
|
title=f"Distribution of {selected_numeric}") |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
with col2: |
|
fig = px.box(self.df, y=selected_numeric, |
|
title=f"Box Plot of {selected_numeric}") |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
|
|
st.subheader("Statistical Summary") |
|
summary_stats = self.df[numeric_cols].describe() |
|
st.dataframe(summary_stats, use_container_width=True) |
|
|
|
|
|
if len(numeric_cols) > 1: |
|
st.subheader("Correlation Analysis") |
|
corr_matrix = calculate_correlation_matrix(self.df) |
|
if not corr_matrix.empty: |
|
fig = px.imshow(corr_matrix, text_auto=True, aspect="auto", |
|
title="Correlation Matrix") |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
|
|
corr_values = [] |
|
for i in range(len(corr_matrix.columns)): |
|
for j in range(i+1, len(corr_matrix.columns)): |
|
corr_values.append(abs(corr_matrix.iloc[i, j])) |
|
|
|
if corr_values: |
|
max_corr = max(corr_values) |
|
self.add_insight(f"Maximum correlation coefficient: {max_corr:.3f}", 2) |
|
|
|
|
|
if categorical_cols: |
|
st.subheader("Categorical Variables") |
|
selected_categorical = st.selectbox("Select categorical column:", categorical_cols) |
|
|
|
value_counts = get_value_counts(self.df, selected_categorical) |
|
fig = px.bar(x=value_counts.index, y=value_counts.values, |
|
title=f"Top 10 {selected_categorical} Values") |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
total_categories = self.df[selected_categorical].nunique() |
|
self.add_insight(f"Column '{selected_categorical}' has {total_categories} unique categories", 2) |
|
|
|
def stage_3_cleaning(self): |
|
"""Stage 3: Data Quality Assessment""" |
|
st.subheader("π§Ή Data Quality Assessment") |
|
|
|
cleaning_actions = [] |
|
cleaning_history = [] |
|
|
|
|
|
if self.stats['missing_values'] > 0: |
|
st.subheader("Missing Values Treatment") |
|
missing_df = calculate_missing_data(self.df) |
|
st.dataframe(missing_df, use_container_width=True) |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
selected_col = st.selectbox("Select column to handle missing values:", |
|
missing_df['Column'].tolist()) |
|
with col2: |
|
fill_method = st.selectbox("Choose fill method:", |
|
["Drop rows", "Mean", "Median", "Mode", "Custom value"]) |
|
|
|
if st.button("Apply Missing Value Treatment"): |
|
try: |
|
if fill_method == "Drop rows": |
|
self.df = self.df.dropna(subset=[selected_col]) |
|
cleaning_history.append(f"Dropped rows with missing values in {selected_col}") |
|
else: |
|
if fill_method == "Mean": |
|
fill_value = self.df[selected_col].mean() |
|
elif fill_method == "Median": |
|
fill_value = self.df[selected_col].median() |
|
elif fill_method == "Mode": |
|
fill_value = self.df[selected_col].mode()[0] |
|
else: |
|
fill_value = st.number_input("Enter custom value:", value=0.0) |
|
|
|
self.df[selected_col] = self.df[selected_col].fillna(fill_value) |
|
cleaning_history.append(f"Filled missing values in {selected_col} with {fill_method}") |
|
|
|
st.success("β
Missing values handled successfully!") |
|
except Exception as e: |
|
st.error(f"Error handling missing values: {str(e)}") |
|
|
|
|
|
if self.stats['duplicates'] > 0: |
|
st.subheader("Duplicate Rows") |
|
st.warning(f"Found {self.stats['duplicates']} duplicate rows") |
|
|
|
if st.button("Remove Duplicate Rows"): |
|
original_len = len(self.df) |
|
self.df = self.df.drop_duplicates() |
|
removed = original_len - len(self.df) |
|
cleaning_history.append(f"Removed {removed} duplicate rows") |
|
st.success(f"β
Removed {removed} duplicate rows") |
|
else: |
|
st.success("β
No duplicate rows found") |
|
|
|
|
|
mixed_types = detect_mixed_types(self.df) |
|
if mixed_types: |
|
st.subheader("Mixed Data Types") |
|
mixed_df = pd.DataFrame(mixed_types) |
|
st.dataframe(mixed_df, use_container_width=True) |
|
|
|
selected_col = st.selectbox("Select column to fix data type:", |
|
[item['column'] for item in mixed_types]) |
|
|
|
fix_method = st.selectbox("Choose fix method:", |
|
["Convert to numeric", "Convert to string"]) |
|
|
|
if st.button("Fix Data Type"): |
|
try: |
|
if fix_method == "Convert to numeric": |
|
self.df[selected_col] = pd.to_numeric(self.df[selected_col], errors='coerce') |
|
else: |
|
self.df[selected_col] = self.df[selected_col].astype(str) |
|
|
|
cleaning_history.append(f"Fixed data type for {selected_col} to {fix_method}") |
|
st.success("β
Data type fixed successfully!") |
|
except Exception as e: |
|
st.error(f"Error fixing data type: {str(e)}") |
|
|
|
|
|
numeric_cols = self.column_types['numeric'] |
|
if numeric_cols: |
|
st.subheader("Outlier Detection") |
|
selected_col = st.selectbox("Select column for outlier detection:", numeric_cols) |
|
|
|
outliers = calculate_outliers(self.df, selected_col) |
|
outlier_count = len(outliers) |
|
|
|
if outlier_count > 0: |
|
st.warning(f"Found {outlier_count} potential outliers in '{selected_col}'") |
|
st.dataframe(outliers[[selected_col]].head(100), use_container_width=True) |
|
|
|
treatment_method = st.selectbox("Choose outlier treatment method:", |
|
["None", "Remove", "Cap at percentiles"]) |
|
|
|
if treatment_method != "None" and st.button("Apply Outlier Treatment"): |
|
try: |
|
if treatment_method == "Remove": |
|
self.df = self.df[~self.df.index.isin(outliers.index)] |
|
cleaning_history.append(f"Removed {outlier_count} outliers from {selected_col}") |
|
else: |
|
Q1 = self.df[selected_col].quantile(0.25) |
|
Q3 = self.df[selected_col].quantile(0.75) |
|
IQR = Q3 - Q1 |
|
lower_bound = Q1 - 1.5 * IQR |
|
upper_bound = Q3 + 1.5 * IQR |
|
|
|
self.df[selected_col] = self.df[selected_col].clip(lower_bound, upper_bound) |
|
cleaning_history.append(f"Capped outliers in {selected_col} at percentiles") |
|
|
|
st.success("β
Outliers handled successfully!") |
|
except Exception as e: |
|
st.error(f"Error handling outliers: {str(e)}") |
|
else: |
|
st.success(f"β
No outliers detected in '{selected_col}'") |
|
|
|
|
|
if cleaning_history: |
|
st.subheader("Cleaning Operations History") |
|
for i, operation in enumerate(cleaning_history, 1): |
|
st.write(f"{i}. {operation}") |
|
self.add_insight(f"Performed {len(cleaning_history)} data cleaning operations", 3) |
|
|
|
|
|
if cleaning_actions: |
|
st.subheader("Remaining Action Items") |
|
for i, action in enumerate(cleaning_actions, 1): |
|
st.write(f"{i}. {action}") |
|
self.add_insight(f"Identified {len(cleaning_actions)} data quality issues", 3) |
|
else: |
|
st.success("β
Data quality is excellent!") |
|
self.add_insight("No major data quality issues found", 3) |
|
|
|
def stage_4_analysis(self): |
|
"""Stage 4: Advanced Analysis""" |
|
st.subheader("π¬ Advanced Analysis") |
|
|
|
numeric_cols = self.column_types['numeric'] |
|
categorical_cols = self.column_types['categorical'] |
|
|
|
|
|
if len(numeric_cols) >= 2: |
|
st.subheader("Variable Relationships") |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
x_var = st.selectbox("X Variable:", numeric_cols) |
|
with col2: |
|
y_var = st.selectbox("Y Variable:", |
|
[col for col in numeric_cols if col != x_var]) |
|
|
|
|
|
sample_size = min(5000, len(self.df)) |
|
sample_df = self.df.sample(n=sample_size) if len(self.df) > sample_size else self.df |
|
|
|
fig = px.scatter(sample_df, x=x_var, y=y_var, |
|
title=f"Relationship: {x_var} vs {y_var}") |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
correlation = self.df[x_var].corr(self.df[y_var]) |
|
st.metric("Correlation", f"{correlation:.3f}") |
|
|
|
if abs(correlation) > 0.7: |
|
strength = "Strong" |
|
elif abs(correlation) > 0.3: |
|
strength = "Moderate" |
|
else: |
|
strength = "Weak" |
|
|
|
direction = "positive" if correlation > 0 else "negative" |
|
st.write(f"**Result:** {strength} {direction} correlation") |
|
self.add_insight(f"{strength} correlation ({correlation:.3f}) between {x_var} and {y_var}", 4) |
|
|
|
|
|
if categorical_cols and numeric_cols: |
|
st.subheader("Group Analysis") |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
group_var = st.selectbox("Group by:", categorical_cols) |
|
with col2: |
|
metric_var = st.selectbox("Analyze:", numeric_cols) |
|
|
|
group_stats = calculate_group_stats(self.df, group_var, metric_var) |
|
st.dataframe(group_stats, use_container_width=True) |
|
|
|
|
|
unique_groups = self.df[group_var].nunique() |
|
if unique_groups <= 20: |
|
fig = px.box(self.df, x=group_var, y=metric_var, |
|
title=f"{metric_var} by {group_var}") |
|
st.plotly_chart(fig, use_container_width=True) |
|
else: |
|
st.info(f"Too many groups ({unique_groups}) for visualization. Showing statistics only.") |
|
|
|
best_group = group_stats['mean'].idxmax() |
|
best_value = group_stats.loc[best_group, 'mean'] |
|
self.add_insight(f"'{best_group}' has highest average {metric_var}: {best_value:.2f}", 4) |
|
|
|
def stage_5_summary(self): |
|
"""Stage 5: Summary and Export""" |
|
st.subheader("π Analysis Summary") |
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
with col1: |
|
st.metric("Total Insights", len(self.insights)) |
|
with col2: |
|
quality = "High" if self.stats['missing_values'] == 0 else "Medium" |
|
st.metric("Data Quality", quality) |
|
with col3: |
|
st.metric("Analysis Complete", "β
") |
|
|
|
|
|
st.subheader("Key Insights") |
|
for i, insight in enumerate(self.insights, 1): |
|
st.write(f"{i}. **Stage {insight['stage']}:** {insight['insight']}") |
|
|
|
|
|
st.subheader("Export Results") |
|
export_format = st.selectbox("Choose export format:", |
|
["Text Report", "Markdown Report", "Python Code", "Cleaned Data"]) |
|
|
|
if export_format == "Text Report": |
|
report = self.generate_text_report() |
|
st.download_button( |
|
label="Download Text Report", |
|
data=report, |
|
file_name="analysis_report.txt", |
|
mime="text/plain" |
|
) |
|
|
|
elif export_format == "Markdown Report": |
|
report = self.generate_markdown_report() |
|
st.download_button( |
|
label="Download Markdown Report", |
|
data=report, |
|
file_name="analysis_report.md", |
|
mime="text/markdown" |
|
) |
|
|
|
elif export_format == "Python Code": |
|
code = self.generate_python_code() |
|
st.code(code, language="python") |
|
st.download_button( |
|
label="Download Python Script", |
|
data=code, |
|
file_name="analysis_script.py", |
|
mime="text/plain" |
|
) |
|
|
|
else: |
|
|
|
data_format = st.selectbox("Choose data format:", |
|
["CSV", "Excel", "Parquet"]) |
|
|
|
if st.button("Export Data"): |
|
try: |
|
if data_format == "CSV": |
|
csv = self.df.to_csv(index=False) |
|
st.download_button( |
|
label="Download CSV", |
|
data=csv, |
|
file_name="cleaned_data.csv", |
|
mime="text/csv" |
|
) |
|
elif data_format == "Excel": |
|
excel_buffer = BytesIO() |
|
self.df.to_excel(excel_buffer, index=False) |
|
excel_data = excel_buffer.getvalue() |
|
st.download_button( |
|
label="Download Excel", |
|
data=excel_data, |
|
file_name="cleaned_data.xlsx", |
|
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" |
|
) |
|
else: |
|
parquet_buffer = BytesIO() |
|
self.df.to_parquet(parquet_buffer, index=False) |
|
parquet_data = parquet_buffer.getvalue() |
|
st.download_button( |
|
label="Download Parquet", |
|
data=parquet_data, |
|
file_name="cleaned_data.parquet", |
|
mime="application/octet-stream" |
|
) |
|
except Exception as e: |
|
st.error(f"Error exporting data: {str(e)}") |
|
|
|
def generate_text_report(self) -> str: |
|
"""Generate text analysis report""" |
|
report = f"""DATA ANALYSIS REPORT |
|
================== |
|
|
|
Dataset Overview: |
|
- Rows: {self.stats['shape'][0]:,} |
|
- Columns: {self.stats['shape'][1]:,} |
|
- Missing Values: {self.stats['missing_values']:,} |
|
- Memory Usage: {self.stats['memory_usage']:.1f} MB |
|
|
|
Key Insights: |
|
""" |
|
for insight in self.insights: |
|
report += f"\n- Stage {insight['stage']}: {insight['insight']}" |
|
|
|
report += f"\n\nGenerated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}" |
|
return report |
|
|
|
def generate_markdown_report(self) -> str: |
|
"""Generate markdown analysis report""" |
|
report = f"""# Data Analysis Report |
|
|
|
## Dataset Overview |
|
* **Rows:** {self.stats['shape'][0]:,} |
|
* **Columns:** {self.stats['shape'][1]:,} |
|
* **Missing Values:** {self.stats['missing_values']:,} |
|
* **Memory Usage:** {self.stats['memory_usage']:.1f} MB |
|
|
|
## Data Types |
|
``` |
|
{pd.DataFrame(self.stats['dtypes'].items(), columns=['Type', 'Count']).to_markdown()} |
|
``` |
|
|
|
## Key Insights |
|
""" |
|
|
|
for stage in range(1, 6): |
|
stage_insights = [i for i in self.insights if i['stage'] == stage] |
|
if stage_insights: |
|
report += f"\n### Stage {stage}\n" |
|
for insight in stage_insights: |
|
report += f"* {insight['insight']}\n" |
|
|
|
report += f"\n\n*Generated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}*" |
|
return report |
|
|
|
def generate_python_code(self) -> str: |
|
"""Generate reproducible Python code""" |
|
code = """import pandas as pd |
|
import numpy as np |
|
import plotly.express as px |
|
from typing import Dict, List, Any |
|
|
|
# Load and prepare data |
|
df = pd.read_csv('your_data.csv') # Update with your data source |
|
|
|
# Basic statistics |
|
def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]: |
|
return { |
|
'shape': df.shape, |
|
'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2), |
|
'missing_values': int(df.isnull().sum().sum()), |
|
'dtypes': df.dtypes.value_counts().to_dict(), |
|
'duplicates': int(df.duplicated().sum()) |
|
} |
|
|
|
stats = calculate_basic_stats(df) |
|
print("\\nBasic Statistics:") |
|
print(f"- Shape: {stats['shape']}") |
|
print(f"- Memory Usage: {stats['memory_usage']:.1f} MB") |
|
print(f"- Missing Values: {stats['missing_values']}") |
|
print(f"- Duplicates: {stats['duplicates']}") |
|
|
|
""" |
|
|
|
if hasattr(self, 'cleaning_history'): |
|
code += "\n# Data Cleaning\n" |
|
for operation in self.cleaning_history: |
|
if "missing values" in operation.lower(): |
|
code += "# Handle missing values\n" |
|
code += "df = df.fillna(method='ffill') # Update with your chosen method\n" |
|
elif "duplicate" in operation.lower(): |
|
code += "# Remove duplicates\n" |
|
code += "df = df.drop_duplicates()\n" |
|
elif "outlier" in operation.lower(): |
|
code += """# Handle outliers |
|
def remove_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame: |
|
Q1 = df[column].quantile(0.25) |
|
Q3 = df[column].quantile(0.75) |
|
IQR = Q3 - Q1 |
|
return df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))] |
|
|
|
# Apply to numeric columns as needed |
|
numeric_cols = df.select_dtypes(include=[np.number]).columns |
|
for col in numeric_cols: |
|
df = remove_outliers(df, col) |
|
""" |
|
|
|
|
|
code += """ |
|
# Visualizations |
|
def plot_missing_values(df: pd.DataFrame): |
|
missing = df.isnull().sum() |
|
if missing.sum() > 0: |
|
missing = missing[missing > 0] |
|
fig = px.bar(x=missing.index, y=missing.values, |
|
title='Missing Values by Column') |
|
fig.show() |
|
|
|
def plot_correlations(df: pd.DataFrame): |
|
numeric_cols = df.select_dtypes(include=[np.number]).columns |
|
if len(numeric_cols) > 1: |
|
corr = df[numeric_cols].corr() |
|
fig = px.imshow(corr, title='Correlation Matrix') |
|
fig.show() |
|
|
|
# Generate plots |
|
plot_missing_values(df) |
|
plot_correlations(df) |
|
""" |
|
|
|
return code |