Skip to content

Commit 23ac4c8

Browse files
Add Spark ML Gradient Boost
1 parent bf05d84 commit 23ac4c8

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

spark_ml_gradient_boost/__init__.py

Whitespace-only changes.
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from pyspark.ml import Pipeline, PipelineModel
2+
from pyspark.ml.classification import GBTClassifier
3+
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
4+
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler, VectorIndexer
5+
from pyspark.sql import SparkSession
6+
from pyspark.sql.functions import expr, year, col
7+
8+
9+
def train_and_predict():
10+
spark = SparkSession.builder.master("local[*]").appName("MOT Test result prediction").getOrCreate()
11+
spark.sparkContext.setLogLevel("ERROR")
12+
13+
source_files_path = "/home/user/download/dft_test_result_2020"
14+
mot_data = spark.read.option("header", "true").csv(source_files_path)
15+
(mot_data
16+
.withColumn("year", year(col("first_use_date")))
17+
.repartition(col("year"), col("make"), col("model"))
18+
.write
19+
.format("parquet")
20+
.saveAsTable("/home/user/motdata"))
21+
22+
df = (spark.read
23+
.parquet("/home/user/motdata")
24+
.filter("""test_type = 'NT'
25+
and test_result in ('F', 'P') """))
26+
27+
df = (df
28+
.drop("colour", "vehicle_id", "test_id", "test_date", "test_class_id", "test_type", "postcode_area",
29+
"first_use_date")
30+
.withColumn("indexed_test_mileage", expr("int(test_mileage)"))
31+
.withColumn("indexed_year", expr("int(year)"))
32+
.withColumn("indexed_cylinder_capacity", expr("int(cylinder_capacity)"))
33+
.filter("make = 'FORD'")
34+
.filter("model = 'FIESTA'"))
35+
36+
string_cols = ["make", "model", "fuel_type"]
37+
feature_cols = ["test_mileage", "fuel_type", "year", "cylinder_capacity"]
38+
39+
def get_string_indexer(c) -> StringIndexer:
40+
return (StringIndexer()
41+
.setInputCol(c)
42+
.setOutputCol(f"indexed_{c}"))
43+
44+
for _col in feature_cols:
45+
df = df.filter(f"{_col} is not null")
46+
47+
df = df.localCheckpoint(True)
48+
49+
for _col in string_cols:
50+
indexer = get_string_indexer(_col)
51+
df = indexer.fit(df).transform(df)
52+
53+
idx_feature_cols = [f"indexed_{c}" for c in feature_cols]
54+
features_assembler = (VectorAssembler()
55+
.setInputCols(idx_feature_cols)
56+
.setOutputCol("features"))
57+
58+
with_features_vector_df = (features_assembler
59+
.transform(df)
60+
.drop(*idx_feature_cols)
61+
.localCheckpoint(True))
62+
63+
# Automatically identify categorical features, and index them.
64+
# Set maxCategories so features with > 4 distinct values are treated as continuous.
65+
feature_indexer = (VectorIndexer()
66+
.setInputCol("features")
67+
.setOutputCol("indexedFeatures")
68+
.setMaxCategories(4)
69+
.fit(with_features_vector_df))
70+
71+
# Split the data into training and test sets (30% held out for testing).
72+
(training_data, test_data) = with_features_vector_df.randomSplit([0.7, 0.3])
73+
74+
# Train a Gradient boost model.
75+
gb = (GBTClassifier()
76+
.setLabelCol("indexedTestResult")
77+
.setFeaturesCol("indexedFeatures")
78+
.setMaxIter(10)
79+
.setFeatureSubsetStrategy("auto"))
80+
81+
label_indexer = (StringIndexer()
82+
.setInputCol("test_result")
83+
.setOutputCol("indexedTestResult")
84+
.fit(df))
85+
86+
# Convert indexed labels back to original labels.
87+
label_converter = (IndexToString()
88+
.setInputCol("prediction")
89+
.setOutputCol("predictedTestResult")
90+
.setLabels(list(label_indexer.labelsArray[0])))
91+
92+
# Chain indexers and forest in a Pipeline.
93+
pipeline = Pipeline().setStages([label_indexer, feature_indexer, gb, label_converter])
94+
95+
use_saved_model = False
96+
path = "/home/user/models/spark-rf/model"
97+
# Train model. This also runs the indexers.
98+
model: PipelineModel
99+
if use_saved_model:
100+
model = PipelineModel.load(path)
101+
else:
102+
model = pipeline.fit(training_data)
103+
model.write().overwrite().save(path)
104+
105+
# Make predictions.
106+
predictions = model.transform(test_data)
107+
108+
# Select example rows to display.
109+
cols = [*["predictedTestResult", "test_result"], *feature_cols]
110+
# predictions.selectExpr(cols: _*).show(1500)
111+
(predictions
112+
.repartition(1)
113+
.selectExpr(*cols)
114+
.write
115+
.mode("overwrite")
116+
.csv("/tmp/output.csv"))
117+
118+
# Select (prediction, true label) and compute test error.
119+
evaluator = (MulticlassClassificationEvaluator()
120+
.setLabelCol("indexedTestResult")
121+
.setPredictionCol("prediction")
122+
.setMetricName("accuracy"))
123+
accuracy = evaluator.evaluate(predictions)
124+
print(f"Test Accuracy = {accuracy}")
125+
print(f"Test Error = {(1.0 - accuracy)}")
126+
127+
128+
if __name__ == "__main__":
129+
train_and_predict()

0 commit comments

Comments
 (0)