Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions hashprep/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ def analyze(self) -> Dict:
self._summarize_interactions()
self._summarize_missing_values()

# ---- Warnings and Critical Issues ----
self._check_data_leakage()
self._check_high_missing_values()
self._check_empty_columns()
self._check_single_value_columns()
self._check_target_leakage_patterns()
self._check_class_imbalance()
self._check_high_cardinality()
self._check_duplicates()
self._check_mixed_data_types()
self._check_outliers()
self._check_feature_correlation()

return self._generate_summary()

# =========================================================================
Expand Down Expand Up @@ -320,6 +333,224 @@ def _summarize_missing_values(self):
self.summaries["missing_values"] = {
"count": missing_count,
"percentage": missing_percentage,

}

# Simple missingness heatmap structure (list of missing row indexes)
self.summaries["missing_patterns"] = {
col: self.df[self.df[col].isna()].index.tolist()
for col in self.df.columns
if self.df[col].isna().any()
}

# =========================================================================
# Critical Issues & Warning Checks
# =========================================================================

def _check_data_leakage(self, target_col: str = None):
"""Check if any feature is a perfect duplicate of the target"""
if target_col and target_col in self.df.columns:
target = self.df[target_col]
for col in self.df.columns:
if col == target_col:
continue
if self.df[col].equals(target):
self.issues.append(
Issues(
category="data_leakage",
severity="critical",
column=col,
description=f"Column '{col}' is identical to target '{target_col}'",
impact_score="high",
quick_fix="Drop the column before training.",
)
)

def _check_high_missing_values(self, threshold: float = 0.4):
"""Flag columns with > threshold missing values"""
for col in self.df.columns:
missing_pct = self.df[col].isna().mean()
if missing_pct > threshold:
self.issues.append(
Issues(
category="missing_values",
severity="warning",
column=col,
description=f"{missing_pct:.1%} missing values in '{col}'",
impact_score="medium",
quick_fix="Consider imputing or dropping this column.",
)
)

def _check_empty_columns(self):
"""Detect columns that are entirely empty"""
for col in self.df.columns:
if self.df[col].notna().sum() == 0:
self.issues.append(
Issues(
category="empty_column",
severity="critical",
column=col,
description=f"Column '{col}' has no non-missing values",
impact_score="high",
quick_fix="Drop the column.",
)
)

def _check_single_value_columns(self):
"""Detect columns with only one unique value"""
for col in self.df.columns:
if self.df[col].nunique(dropna=True) == 1:
self.issues.append(
Issues(
category="single_value",
severity="warning",
column=col,
description=f"Column '{col}' contains only one unique value",
impact_score="low",
quick_fix="Drop this column (not informative).",
)
)

def _check_target_leakage_patterns(self, target_col: str = None):
"""
Detect columns that strongly correlate with target (possible leakage).
Works only if target_col is provided.
"""
if target_col and target_col in self.df.columns:
target = self.df[target_col]
numeric_cols = self.df.select_dtypes(include="number").drop(
columns=[target_col], errors="ignore"
)
if not numeric_cols.empty and pd.api.types.is_numeric_dtype(target):
corrs = numeric_cols.corrwith(target).abs()
for col, corr in corrs.items():
if corr > 0.95:
self.issues.append(
Issues(
category="target_leakage",
severity="critical",
column=col,
description=f"Column '{col}' highly correlated with target ({corr:.2f})",
impact_score="high",
quick_fix="Remove this column before training.",
)
)

def _check_class_imbalance(self, target_col: str = None, threshold: float = 0.9):
"""Check if target variable is highly imbalanced"""
if target_col and target_col in self.df.columns:
counts = self.df[target_col].value_counts(normalize=True)
if counts.iloc[0] > threshold:
self.issues.append(
Issues(
category="class_imbalance",
severity="warning",
column=target_col,
description=f"Target '{target_col}' is imbalanced ({counts.iloc[0]:.1%} in one class)",
impact_score="medium",
quick_fix="Consider stratified sampling, resampling, or class-weighted models.",
)
)

def _check_high_cardinality(self, threshold: int = 100):
"""Detect categorical columns with too many unique values"""
categorical_cols = self.df.select_dtypes(include="object").columns
for col in categorical_cols:
unique_count = self.df[col].nunique()
if unique_count > threshold:
self.issues.append(
Issues(
category="high_cardinality",
severity="warning",
column=col,
description=f"Column '{col}' has {unique_count} unique values",
impact_score="medium",
quick_fix="Consider feature hashing or grouping rare categories.",
)
)

def _check_duplicates(self):
"""Check for duplicate rows"""
duplicate_rows = self.df.duplicated().sum()
if duplicate_rows > 0:
self.issues.append(
Issues(
category="duplicates",
severity="warning",
column="__all__",
description=f"Dataset contains {duplicate_rows} duplicate rows",
impact_score="medium",
quick_fix="Drop duplicates if not meaningful.",
)
)

def _check_mixed_data_types(self):
"""Detect columns with mixed dtypes (e.g., numbers + strings)"""
for col in self.df.columns:
types = self.df[col].dropna().map(type).nunique()
if types > 1:
self.issues.append(
Issues(
category="mixed_types",
severity="warning",
column=col,
description=f"Column '{col}' contains mixed data types",
impact_score="low",
quick_fix="Clean or cast to a single type.",
)
)

def _check_outliers(self, z_threshold: float = 4.0):
"""Flag numeric columns with extreme outliers based on Z-score"""
from scipy.stats import zscore

numeric_df = self.df.select_dtypes(include="number").dropna()
if numeric_df.empty:
return

z_scores = (numeric_df - numeric_df.mean()) / numeric_df.std(ddof=0)
for col in numeric_df.columns:
outlier_count = (abs(z_scores[col]) > z_threshold).sum()
if outlier_count > 0:
self.issues.append(
Issues(
category="outliers",
severity="warning",
column=col,
description=f"Column '{col}' has {outlier_count} potential outliers",
impact_score="medium",
quick_fix="Investigate values; consider winsorization or transformations.",
)
)

def _check_feature_correlation(self, threshold: float = 0.95):
"""Detect highly correlated numeric features"""
numeric_df = self.df.select_dtypes(include="number")
if numeric_df.empty:
return

corr_matrix = numeric_df.corr().abs()
upper = corr_matrix.where(np.tril(np.ones(corr_matrix.shape)).astype(bool))
correlated_pairs = [
(col, row, val)
for row in upper.index
for col, val in upper[row].dropna().items()
if val > threshold
]

for col1, col2, corr in correlated_pairs:
self.issues.append(
Issues(
category="feature_correlation",
severity="warning",
column=f"{col1},{col2}",
description=f"Columns '{col1}' and '{col2}' are highly correlated ({corr:.2f})",
impact_score="medium",
quick_fix="Consider dropping one of the correlated features.",
)
)

}

# Simple missingness heatmap structure (list of missing row indexes)
Expand All @@ -329,6 +560,7 @@ def _summarize_missing_values(self):
if self.df[col].isna().any()
}


# =========================================================================
# Generate Summary
# =========================================================================
Expand Down