""" Data Analysis Platform Copyright (c) 2025 JEAN YOUNG All rights reserved. This software is proprietary and confidential. Unauthorized copying, distribution, or use is prohibited. """ import streamlit as st import pandas as pd import numpy as np import warnings from typing import Dict, List, Any, Tuple from scipy import stats warnings.filterwarnings('ignore') # All cached data processing functions @st.cache_data def load_csv_with_encoding(file_content: bytes, filename: str) -> pd.DataFrame: """Load CSV with automatic encoding detection - cached""" import chardet detected = chardet.detect(file_content) encoding = detected['encoding'] try: from io import BytesIO return pd.read_csv(BytesIO(file_content), encoding=encoding) except: encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] for enc in encodings: try: return pd.read_csv(BytesIO(file_content), encoding=enc) except: continue raise Exception("Cannot read file with any encoding") @st.cache_data def load_excel_file(file_content: bytes) -> pd.DataFrame: """Load Excel file - cached""" from io import BytesIO return pd.read_excel(BytesIO(file_content)) @st.cache_data def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]: """Calculate basic statistics - cached""" dtype_counts = df.dtypes.value_counts() dtype_dict = {str(k): int(v) for k, v in dtype_counts.items()} return { 'shape': df.shape, 'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2), 'missing_values': int(df.isnull().sum().sum()), 'dtypes': dtype_dict, 'duplicates': int(df.duplicated().sum()) } @st.cache_data def calculate_column_cardinality(df: pd.DataFrame) -> pd.DataFrame: """Calculate column cardinality analysis - cached""" cardinality_data = [] for col in df.columns: unique_count = df[col].nunique() unique_ratio = unique_count / len(df) # Determine column type based on cardinality if unique_count == 1: col_type = "Constant" elif unique_count == len(df): col_type = "Unique Identifier" elif unique_ratio < 0.05: col_type = "Low Cardinality" elif unique_ratio < 0.5: col_type = "Medium Cardinality" else: col_type = "High Cardinality" cardinality_data.append({ 'Column': col, 'Unique Count': unique_count, 'Unique Ratio': unique_ratio, 'Type': col_type, 'Data Type': str(df[col].dtype) }) return pd.DataFrame(cardinality_data) @st.cache_data def calculate_memory_optimization(df: pd.DataFrame) -> Dict[str, Any]: """Calculate memory optimization suggestions - cached""" suggestions = [] current_memory = df.memory_usage(deep=True).sum() / 1024**2 potential_savings = 0 for col in df.columns: if df[col].dtype == 'object': unique_ratio = df[col].nunique() / len(df) if unique_ratio < 0.5: # Less than 50% unique values # Estimate category memory usage category_memory = df[col].astype('category').memory_usage(deep=True) object_memory = df[col].memory_usage(deep=True) savings = (object_memory - category_memory) / 1024**2 if savings > 0.1: # More than 0.1MB savings suggestions.append({ 'column': col, 'current_type': 'object', 'suggested_type': 'category', 'savings_mb': savings }) potential_savings += savings return { 'suggestions': suggestions, 'current_memory_mb': current_memory, 'potential_savings_mb': potential_savings, 'potential_savings_pct': (potential_savings / current_memory) * 100 if current_memory > 0 else 0 } @st.cache_data def calculate_missing_data(df: pd.DataFrame) -> pd.DataFrame: """Calculate missing data analysis - cached""" missing_data = df.isnull().sum() if missing_data.sum() > 0: missing_df = pd.DataFrame({ 'Column': missing_data.index, 'Missing Count': missing_data.values, 'Missing %': (missing_data.values / len(df)) * 100 }) return missing_df[missing_df['Missing Count'] > 0].sort_values('Missing %', ascending=False) return pd.DataFrame() @st.cache_data def calculate_correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: """Calculate correlation matrix - cached""" numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() return df[numeric_cols].corr() if len(numeric_cols) > 1 else pd.DataFrame() @st.cache_data def get_column_types(df: pd.DataFrame) -> Dict[str, List[str]]: """Get column types - cached""" return { 'numeric': df.select_dtypes(include=[np.number]).columns.tolist(), 'categorical': df.select_dtypes(include=['object']).columns.tolist(), 'datetime': df.select_dtypes(include=['datetime64']).columns.tolist() } @st.cache_data def calculate_numeric_stats(df: pd.DataFrame, column: str) -> Dict[str, float]: """Calculate enhanced numeric statistics - cached""" series = df[column].dropna() return { 'mean': series.mean(), 'median': series.median(), 'std': series.std(), 'skewness': series.skew(), 'kurtosis': series.kurtosis(), 'min': series.min(), 'max': series.max(), 'q25': series.quantile(0.25), 'q75': series.quantile(0.75) } @st.cache_data def calculate_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame: """Calculate outliers using IQR method - cached""" Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR return df[(df[column] < lower_bound) | (df[column] > upper_bound)] @st.cache_data def detect_mixed_types(df: pd.DataFrame) -> List[Dict[str, Any]]: """Detect columns with mixed data types - cached""" mixed_type_issues = [] for col in df.select_dtypes(include=['object']).columns: # Try to convert to numeric numeric_conversion = pd.to_numeric(df[col], errors='coerce') new_nulls = numeric_conversion.isnull().sum() - df[col].isnull().sum() if new_nulls > 0: mixed_type_issues.append({ 'column': col, 'problematic_values': new_nulls, 'total_values': len(df[col]), 'percentage': (new_nulls / len(df[col])) * 100 }) return mixed_type_issues @st.cache_data def get_value_counts(df: pd.DataFrame, column: str, top_n: int = 10) -> pd.Series: """Get value counts for categorical column - cached""" return df[column].value_counts().head(top_n) @st.cache_data def calculate_crosstab(df: pd.DataFrame, col1: str, col2: str) -> pd.DataFrame: """Calculate crosstab between two categorical columns - cached""" return pd.crosstab(df[col1], df[col2]) @st.cache_data def calculate_group_stats(df: pd.DataFrame, group_col: str, metric_col: str) -> pd.DataFrame: """Calculate group statistics - cached""" return df.groupby(group_col)[metric_col].agg(['mean', 'median', 'std', 'count']) @st.cache_data def calculate_data_quality_score(df: pd.DataFrame) -> Dict[str, Any]: """Calculate overall data quality score - cached""" score = 100 issues = [] # Missing values penalty missing_pct = (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100 if missing_pct > 0: penalty = min(30, missing_pct * 2) # Max 30 points penalty score -= penalty issues.append(f"Missing values: {missing_pct:.1f}%") # Duplicates penalty duplicate_pct = (df.duplicated().sum() / len(df)) * 100 if duplicate_pct > 0: penalty = min(20, duplicate_pct * 4) # Max 20 points penalty score -= penalty issues.append(f"Duplicate rows: {duplicate_pct:.1f}%") # Constant columns penalty constant_cols = [col for col in df.columns if df[col].nunique() == 1] if constant_cols: penalty = min(10, len(constant_cols) * 2) score -= penalty issues.append(f"Constant columns: {len(constant_cols)}") # Mixed types penalty mixed_types = detect_mixed_types(df) if mixed_types: penalty = min(10, len(mixed_types) * 3) score -= penalty issues.append(f"Mixed type columns: {len(mixed_types)}") return { 'score': max(0, score), 'issues': issues, 'grade': 'A' if score >= 90 else 'B' if score >= 80 else 'C' if score >= 70 else 'D' if score >= 60 else 'F' } def load_data(uploaded_file): """Unified data loading function""" file_content = uploaded_file.read() uploaded_file.seek(0) if uploaded_file.name.endswith('.csv'): return load_csv_with_encoding(file_content, uploaded_file.name) else: return load_excel_file(file_content) def apply_data_cleaning(df: pd.DataFrame, operations: List[Dict[str, Any]]) -> pd.DataFrame: """Apply data cleaning operations""" cleaned_df = df.copy() for operation in operations: if operation['type'] == 'fill_missing': if operation['method'] == 'mean': cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( cleaned_df[operation['column']].mean()) elif operation['method'] == 'median': cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( cleaned_df[operation['column']].median()) elif operation['method'] == 'mode': cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( cleaned_df[operation['column']].mode().iloc[0] if not cleaned_df[operation['column']].mode().empty else 0) elif operation['method'] == 'drop': cleaned_df = cleaned_df.dropna(subset=[operation['column']]) elif operation['type'] == 'remove_duplicates': cleaned_df = cleaned_df.drop_duplicates() elif operation['type'] == 'remove_outliers': Q1 = cleaned_df[operation['column']].quantile(0.25) Q3 = cleaned_df[operation['column']].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR cleaned_df = cleaned_df[ (cleaned_df[operation['column']] >= lower_bound) & (cleaned_df[operation['column']] <= upper_bound) ] elif operation['type'] == 'cap_outliers': Q1 = cleaned_df[operation['column']].quantile(0.25) Q3 = cleaned_df[operation['column']].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR cleaned_df[operation['column']] = cleaned_df[operation['column']].clip(lower_bound, upper_bound) elif operation['type'] == 'convert_type': if operation['target_type'] == 'category': cleaned_df[operation['column']] = cleaned_df[operation['column']].astype('category') return cleaned_df