|
""" |
|
Data Analysis Platform |
|
Copyright (c) 2025 JEAN YOUNG |
|
All rights reserved. |
|
|
|
This software is proprietary and confidential. |
|
Unauthorized copying, distribution, or use is prohibited. |
|
""" |
|
import streamlit as st |
|
import pandas as pd |
|
import numpy as np |
|
import warnings |
|
from typing import Dict, List, Any, Tuple |
|
from scipy import stats |
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
@st.cache_data |
|
def load_csv_with_encoding(file_content: bytes, filename: str) -> pd.DataFrame: |
|
"""Load CSV with automatic encoding detection - cached""" |
|
import chardet |
|
|
|
detected = chardet.detect(file_content) |
|
encoding = detected['encoding'] |
|
|
|
try: |
|
from io import BytesIO |
|
return pd.read_csv(BytesIO(file_content), encoding=encoding) |
|
except: |
|
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] |
|
for enc in encodings: |
|
try: |
|
return pd.read_csv(BytesIO(file_content), encoding=enc) |
|
except: |
|
continue |
|
raise Exception("Cannot read file with any encoding") |
|
|
|
@st.cache_data |
|
def load_excel_file(file_content: bytes) -> pd.DataFrame: |
|
"""Load Excel file - cached""" |
|
from io import BytesIO |
|
return pd.read_excel(BytesIO(file_content)) |
|
|
|
@st.cache_data |
|
def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]: |
|
"""Calculate basic statistics - cached""" |
|
dtype_counts = df.dtypes.value_counts() |
|
dtype_dict = {str(k): int(v) for k, v in dtype_counts.items()} |
|
|
|
return { |
|
'shape': df.shape, |
|
'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2), |
|
'missing_values': int(df.isnull().sum().sum()), |
|
'dtypes': dtype_dict, |
|
'duplicates': int(df.duplicated().sum()) |
|
} |
|
|
|
@st.cache_data |
|
def calculate_column_cardinality(df: pd.DataFrame) -> pd.DataFrame: |
|
"""Calculate column cardinality analysis - cached""" |
|
cardinality_data = [] |
|
|
|
for col in df.columns: |
|
unique_count = df[col].nunique() |
|
unique_ratio = unique_count / len(df) |
|
|
|
|
|
if unique_count == 1: |
|
col_type = "Constant" |
|
elif unique_count == len(df): |
|
col_type = "Unique Identifier" |
|
elif unique_ratio < 0.05: |
|
col_type = "Low Cardinality" |
|
elif unique_ratio < 0.5: |
|
col_type = "Medium Cardinality" |
|
else: |
|
col_type = "High Cardinality" |
|
|
|
cardinality_data.append({ |
|
'Column': col, |
|
'Unique Count': unique_count, |
|
'Unique Ratio': unique_ratio, |
|
'Type': col_type, |
|
'Data Type': str(df[col].dtype) |
|
}) |
|
|
|
return pd.DataFrame(cardinality_data) |
|
|
|
@st.cache_data |
|
def calculate_memory_optimization(df: pd.DataFrame) -> Dict[str, Any]: |
|
"""Calculate memory optimization suggestions - cached""" |
|
suggestions = [] |
|
current_memory = df.memory_usage(deep=True).sum() / 1024**2 |
|
potential_savings = 0 |
|
|
|
for col in df.columns: |
|
if df[col].dtype == 'object': |
|
unique_ratio = df[col].nunique() / len(df) |
|
if unique_ratio < 0.5: |
|
|
|
category_memory = df[col].astype('category').memory_usage(deep=True) |
|
object_memory = df[col].memory_usage(deep=True) |
|
savings = (object_memory - category_memory) / 1024**2 |
|
|
|
if savings > 0.1: |
|
suggestions.append({ |
|
'column': col, |
|
'current_type': 'object', |
|
'suggested_type': 'category', |
|
'savings_mb': savings |
|
}) |
|
potential_savings += savings |
|
|
|
return { |
|
'suggestions': suggestions, |
|
'current_memory_mb': current_memory, |
|
'potential_savings_mb': potential_savings, |
|
'potential_savings_pct': (potential_savings / current_memory) * 100 if current_memory > 0 else 0 |
|
} |
|
|
|
@st.cache_data |
|
def calculate_missing_data(df: pd.DataFrame) -> pd.DataFrame: |
|
"""Calculate missing data analysis - cached""" |
|
missing_data = df.isnull().sum() |
|
if missing_data.sum() > 0: |
|
missing_df = pd.DataFrame({ |
|
'Column': missing_data.index, |
|
'Missing Count': missing_data.values, |
|
'Missing %': (missing_data.values / len(df)) * 100 |
|
}) |
|
return missing_df[missing_df['Missing Count'] > 0].sort_values('Missing %', ascending=False) |
|
return pd.DataFrame() |
|
|
|
@st.cache_data |
|
def calculate_correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: |
|
"""Calculate correlation matrix - cached""" |
|
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
|
return df[numeric_cols].corr() if len(numeric_cols) > 1 else pd.DataFrame() |
|
|
|
@st.cache_data |
|
def get_column_types(df: pd.DataFrame) -> Dict[str, List[str]]: |
|
"""Get column types - cached""" |
|
return { |
|
'numeric': df.select_dtypes(include=[np.number]).columns.tolist(), |
|
'categorical': df.select_dtypes(include=['object']).columns.tolist(), |
|
'datetime': df.select_dtypes(include=['datetime64']).columns.tolist() |
|
} |
|
|
|
@st.cache_data |
|
def calculate_numeric_stats(df: pd.DataFrame, column: str) -> Dict[str, float]: |
|
"""Calculate enhanced numeric statistics - cached""" |
|
series = df[column].dropna() |
|
return { |
|
'mean': series.mean(), |
|
'median': series.median(), |
|
'std': series.std(), |
|
'skewness': series.skew(), |
|
'kurtosis': series.kurtosis(), |
|
'min': series.min(), |
|
'max': series.max(), |
|
'q25': series.quantile(0.25), |
|
'q75': series.quantile(0.75) |
|
} |
|
|
|
@st.cache_data |
|
def calculate_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame: |
|
"""Calculate outliers using IQR method - cached""" |
|
Q1 = df[column].quantile(0.25) |
|
Q3 = df[column].quantile(0.75) |
|
IQR = Q3 - Q1 |
|
lower_bound = Q1 - 1.5 * IQR |
|
upper_bound = Q3 + 1.5 * IQR |
|
|
|
return df[(df[column] < lower_bound) | (df[column] > upper_bound)] |
|
|
|
@st.cache_data |
|
def detect_mixed_types(df: pd.DataFrame) -> List[Dict[str, Any]]: |
|
"""Detect columns with mixed data types - cached""" |
|
mixed_type_issues = [] |
|
|
|
for col in df.select_dtypes(include=['object']).columns: |
|
|
|
numeric_conversion = pd.to_numeric(df[col], errors='coerce') |
|
new_nulls = numeric_conversion.isnull().sum() - df[col].isnull().sum() |
|
|
|
if new_nulls > 0: |
|
mixed_type_issues.append({ |
|
'column': col, |
|
'problematic_values': new_nulls, |
|
'total_values': len(df[col]), |
|
'percentage': (new_nulls / len(df[col])) * 100 |
|
}) |
|
|
|
return mixed_type_issues |
|
|
|
@st.cache_data |
|
def get_value_counts(df: pd.DataFrame, column: str, top_n: int = 10) -> pd.Series: |
|
"""Get value counts for categorical column - cached""" |
|
return df[column].value_counts().head(top_n) |
|
|
|
@st.cache_data |
|
def calculate_crosstab(df: pd.DataFrame, col1: str, col2: str) -> pd.DataFrame: |
|
"""Calculate crosstab between two categorical columns - cached""" |
|
return pd.crosstab(df[col1], df[col2]) |
|
|
|
@st.cache_data |
|
def calculate_group_stats(df: pd.DataFrame, group_col: str, metric_col: str) -> pd.DataFrame: |
|
"""Calculate group statistics - cached""" |
|
return df.groupby(group_col)[metric_col].agg(['mean', 'median', 'std', 'count']) |
|
|
|
@st.cache_data |
|
def calculate_data_quality_score(df: pd.DataFrame) -> Dict[str, Any]: |
|
"""Calculate overall data quality score - cached""" |
|
score = 100 |
|
issues = [] |
|
|
|
|
|
missing_pct = (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100 |
|
if missing_pct > 0: |
|
penalty = min(30, missing_pct * 2) |
|
score -= penalty |
|
issues.append(f"Missing values: {missing_pct:.1f}%") |
|
|
|
|
|
duplicate_pct = (df.duplicated().sum() / len(df)) * 100 |
|
if duplicate_pct > 0: |
|
penalty = min(20, duplicate_pct * 4) |
|
score -= penalty |
|
issues.append(f"Duplicate rows: {duplicate_pct:.1f}%") |
|
|
|
|
|
constant_cols = [col for col in df.columns if df[col].nunique() == 1] |
|
if constant_cols: |
|
penalty = min(10, len(constant_cols) * 2) |
|
score -= penalty |
|
issues.append(f"Constant columns: {len(constant_cols)}") |
|
|
|
|
|
mixed_types = detect_mixed_types(df) |
|
if mixed_types: |
|
penalty = min(10, len(mixed_types) * 3) |
|
score -= penalty |
|
issues.append(f"Mixed type columns: {len(mixed_types)}") |
|
|
|
return { |
|
'score': max(0, score), |
|
'issues': issues, |
|
'grade': 'A' if score >= 90 else 'B' if score >= 80 else 'C' if score >= 70 else 'D' if score >= 60 else 'F' |
|
} |
|
|
|
def load_data(uploaded_file): |
|
"""Unified data loading function""" |
|
file_content = uploaded_file.read() |
|
uploaded_file.seek(0) |
|
|
|
if uploaded_file.name.endswith('.csv'): |
|
return load_csv_with_encoding(file_content, uploaded_file.name) |
|
else: |
|
return load_excel_file(file_content) |
|
|
|
def apply_data_cleaning(df: pd.DataFrame, operations: List[Dict[str, Any]]) -> pd.DataFrame: |
|
"""Apply data cleaning operations""" |
|
cleaned_df = df.copy() |
|
|
|
for operation in operations: |
|
if operation['type'] == 'fill_missing': |
|
if operation['method'] == 'mean': |
|
cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( |
|
cleaned_df[operation['column']].mean()) |
|
elif operation['method'] == 'median': |
|
cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( |
|
cleaned_df[operation['column']].median()) |
|
elif operation['method'] == 'mode': |
|
cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( |
|
cleaned_df[operation['column']].mode().iloc[0] if not cleaned_df[operation['column']].mode().empty else 0) |
|
elif operation['method'] == 'drop': |
|
cleaned_df = cleaned_df.dropna(subset=[operation['column']]) |
|
|
|
elif operation['type'] == 'remove_duplicates': |
|
cleaned_df = cleaned_df.drop_duplicates() |
|
|
|
elif operation['type'] == 'remove_outliers': |
|
Q1 = cleaned_df[operation['column']].quantile(0.25) |
|
Q3 = cleaned_df[operation['column']].quantile(0.75) |
|
IQR = Q3 - Q1 |
|
lower_bound = Q1 - 1.5 * IQR |
|
upper_bound = Q3 + 1.5 * IQR |
|
cleaned_df = cleaned_df[ |
|
(cleaned_df[operation['column']] >= lower_bound) & |
|
(cleaned_df[operation['column']] <= upper_bound) |
|
] |
|
|
|
elif operation['type'] == 'cap_outliers': |
|
Q1 = cleaned_df[operation['column']].quantile(0.25) |
|
Q3 = cleaned_df[operation['column']].quantile(0.75) |
|
IQR = Q3 - Q1 |
|
lower_bound = Q1 - 1.5 * IQR |
|
upper_bound = Q3 + 1.5 * IQR |
|
cleaned_df[operation['column']] = cleaned_df[operation['column']].clip(lower_bound, upper_bound) |
|
|
|
elif operation['type'] == 'convert_type': |
|
if operation['target_type'] == 'category': |
|
cleaned_df[operation['column']] = cleaned_df[operation['column']].astype('category') |
|
|
|
return cleaned_df |