Contains my python practice files
from pyspark.sql import SparkSession spark = (SparkSession.builder .appName("optuna-synapseml-lightgbm-with-cv") # If needed, add SynapseML package: # .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.11.2") .getOrCreate())
from pyspark.sql import functions as F from pyspark.storagelevel import StorageLevel from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from synapse.ml.lightgbm import LightGBMClassifier # <- SynapseML estimator
import optuna from optuna.exceptions import TrialPruned
import pandas as pd from sklearn.datasets import make_classification import math import traceback
SEED = 7
LGBM_NUM_THREADS = 4 # tune per your executor cores; 1..N
CV_NUM_FOLDS = 3 CV_PARALLELISM = 2 # safe value; increase if cluster has spare cores
HPO_N_TRIALS = 30 HPO_N_JOBS = 1 # KEEP 1 by default. >1 means multiple fits concurrently from same SparkSession (fragile). USE_OPTUNA_PRUNER = True # enables pruning on bad intermediate results
GRID_SPAN_FRAC = 0.15 # ±15% around trial suggestion for fractional params GRID_NEIGHBORS_INT = 1 # integer neighborhood size around trial suggestion (e.g., +/-1)
X, y = make_classification( n_samples=20000, n_features=40, n_informative=10, n_redundant=10, n_repeated=0, n_classes=2, weights=[0.6, 0.4], random_state=SEED ) pdf = pd.DataFrame(X, columns=[f"f{i}" for i in range(X.shape[1])]) pdf["label"] = y.astype("int32") df = spark.createDataFrame(pdf)
feature_cols = [c for c in df.columns if c != "label"]
train_df, valid_df = df.randomSplit([0.8, 0.2], seed=SEED)
train_df = train_df.persist(StorageLevel.MEMORY_AND_DISK) valid_df = valid_df.persist(StorageLevel.MEMORY_AND_DISK) _ = train_df.count(); _ = valid_df.count() # materialize
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features") evaluator = BinaryClassificationEvaluator( labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC" )
def safe_cv_folds(df, requested_folds: int) -> int: """Reduce folds if dataset is too small or highly imbalanced to support requested_folds.""" total = df.count() if total < requested_folds: return max(2, min(3, total)) # fall back to 2 or 3 # crude class counts (avoid expensive groupBy on every trial by computing once here) pos = df.filter(F.col("label") == 1).count() neg = total - pos min_class = min(pos, neg) # need at least 1 example of each class per fold if min_class < requested_folds: return max(2, min(3, min_class)) # reduce folds to fit minority class return requested_folds
SAFE_CV_NUM_FOLDS = safe_cv_folds(train_df, CV_NUM_FOLDS)
def _bounded_frac(center: float, span: float): lo = max(0.05, center * (1.0 - span)) hi = min(0.999, center * (1.0 + span)) # ensure uniqueness & sorted return sorted({round(lo, 6), round(center, 6), round(hi, 6)})
def _neighbors_int(center: int, k: int, lo: int, hi: int): vals = set([center]) for d in range(1, k + 1): if center - d >= lo: vals.add(center - d) if center + d <= hi: vals.add(center + d) return sorted(vals)
def objective(trial: optuna.Trial) -> float: """ Optuna objective: - Suggest a set of LightGBM hyperparams. - Build a small CV grid around a subset (featureFraction, baggingFraction, numLeaves). - Run Spark CrossValidator on the training partition. - Evaluate best CV model on the hold-out valid_df (AUC). - Return AUC (we set study to maximize), with robust error handling. """ # ADDED: pruner (optional) if USE_OPTUNA_PRUNER: trial.report(0.0, step=0) # seed an initial report so pruner can kick in
try:
# --- 4.1) Sample base hyperparameters (camelCase for SynapseML) ------
numLeaves = trial.suggest_int("numLeaves", 32, 512, log=True)
maxDepth = trial.suggest_int("maxDepth", -1, 16)
learningRate = trial.suggest_float("learningRate", 1e-3, 0.3, log=True)
minSumHessianInLeaf = trial.suggest_float("minSumHessianInLeaf", 1e-3, 10.0, log=True)
lambdaL1 = trial.suggest_float("lambdaL1", 0.0, 10.0)
lambdaL2 = trial.suggest_float("lambdaL2", 0.0, 10.0)
featureFraction = trial.suggest_float("featureFraction", 0.5, 1.0)
baggingFraction = trial.suggest_float("baggingFraction", 0.5, 1.0)
baggingFreq = trial.suggest_int("baggingFreq", 0, 10)
numIterations = trial.suggest_int("numIterations", 300, 1500) # CV w/o early stopping: keep bounded
# ADDED: per-trial training parallelization control
numThreads = trial.suggest_int("numThreads", 1, max(1, LGBM_NUM_THREADS)) # per-exec threads (<= cap)
# --- 4.2) Define the base estimator ---------------------------------
lgbm = (LightGBMClassifier(
objective="binary",
featuresCol="features",
labelCol="label",
# NOTE: Do NOT set validationIndicatorCol inside CV (CV manages its own folds).
numLeaves=numLeaves,
maxDepth=maxDepth,
minSumHessianInLeaf=minSumHessianInLeaf,
lambdaL1=lambdaL1,
lambdaL2=lambdaL2,
learningRate=learningRate,
featureFraction=featureFraction,
baggingFraction=baggingFraction,
baggingFreq=baggingFreq,
numIterations=numIterations,
numThreads=numThreads,
seed=SEED,
# Stability knobs (optional):
# useBarrierExecution=True, # enable on K8s/YARN if you see hanging tasks
# chunkSize=10,
# numBatches=0
))
pipeline = Pipeline(stages=[assembler, lgbm])
# --- 4.3) Build a *local* grid around trial suggestions --------------
# Keep grid small to avoid exploding runtime. Each grid point is a full distributed train.
ff_grid = _bounded_frac(featureFraction, GRID_SPAN_FRAC) # around featureFraction
bf_grid = _bounded_frac(baggingFraction, GRID_SPAN_FRAC) # around baggingFraction
nl_grid = _neighbors_int(numLeaves, GRID_NEIGHBORS_INT, lo=16, hi=1024) # neighborhood around numLeaves
paramGrid = (ParamGridBuilder()
.addGrid(lgbm.featureFraction, ff_grid)
.addGrid(lgbm.baggingFraction, bf_grid)
.addGrid(lgbm.numLeaves, nl_grid)
.build())
# --- 4.4) CrossValidator on training split ---------------------------
# Parallelism here controls how many grid points run simultaneously (each a Spark job).
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=SAFE_CV_NUM_FOLDS,
parallelism=CV_PARALLELISM,
seed=SEED
)
cv_model = cv.fit(train_df) # k-fold CV on train partition only
# --- 4.5) Evaluate best model on holdout validation ------------------
best_model = cv_model.bestModel
preds_valid = best_model.transform(valid_df)
auc = evaluator.evaluate(preds_valid)
# ADDED: report to Optuna (enables pruning if configured)
trial.report(auc, step=1)
if USE_OPTUNA_PRUNER and trial.should_prune():
raise TrialPruned(f"Pruned (AUC={auc:.4f})")
return auc # Study set to 'maximize' below
except TrialPruned as tp:
# Proper pruning: allows Optuna to drop unpromising trials early.
raise tp
except Exception as e:
# Robust error handling: capture trace for later inspection and prune.
trial.set_user_attr("exception", repr(e))
trial.set_user_attr("traceback", traceback.format_exc())
# Prune rather than return a poisonous metric; keeps study healthy.
raise TrialPruned(f"Pruned due to exception: {repr(e)}")
sampler = optuna.samplers.TPESampler(seed=SEED) pruner = optuna.pruners.MedianPruner(n_warmup_steps=5) if USE_OPTUNA_PRUNER else optuna.pruners.NopPruner()
study = optuna.create_study(direction="maximize", sampler=sampler, pruner=pruner, study_name="lgbm_optuna_cv_nested")
study.optimize(objective, n_trials=HPO_N_TRIALS, n_jobs=HPO_N_JOBS, show_progress_bar=True)
print("Best AUC (CV→valid):", study.best_value) print("Best (trial-level) params:", study.best_params)
train_ev_df = (train_df.withColumn("val", F.lit(False)) .unionByName(valid_df.withColumn("val", F.lit(True))) .persist(StorageLevel.MEMORY_AND_DISK)) _ = train_ev_df.count()
Reuse best trial params as a baseline; you may also merge best grid values from cv_model if desired.
best = study.best_params.copy() best.update({ "objective": "binary", "featuresCol": "features", "labelCol": "label", "validationIndicatorCol": "val", "earlyStoppingRound": 50, # typical default; adjust as needed "numIterations": max(500, best.get("numIterations", 500)), # allow early stopping to kick in "numThreads": best.get("numThreads", LGBM_NUM_THREADS), "seed": SEED })
final_lgbm = LightGBMClassifier(**best) final_pipeline = Pipeline(stages=[assembler, final_lgbm]) final_model = final_pipeline.fit(train_ev_df)
test_df = df.sample(withReplacement=False, fraction=0.2, seed=SEED) preds_test = final_model.transform(test_df) test_auc = evaluator.evaluate(preds_test) print("Final Test AUC (early-stopped):", test_auc)