|
| 1 | +from pyspark.ml import Pipeline, PipelineModel |
| 2 | +from pyspark.ml.classification import GBTClassifier |
| 3 | +from pyspark.ml.evaluation import MulticlassClassificationEvaluator |
| 4 | +from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler, VectorIndexer |
| 5 | +from pyspark.sql import SparkSession |
| 6 | +from pyspark.sql.functions import expr, year, col |
| 7 | + |
| 8 | + |
| 9 | +def train_and_predict(): |
| 10 | + spark = SparkSession.builder.master("local[*]").appName("MOT Test result prediction").getOrCreate() |
| 11 | + spark.sparkContext.setLogLevel("ERROR") |
| 12 | + |
| 13 | + source_files_path = "/home/user/download/dft_test_result_2020" |
| 14 | + mot_data = spark.read.option("header", "true").csv(source_files_path) |
| 15 | + (mot_data |
| 16 | + .withColumn("year", year(col("first_use_date"))) |
| 17 | + .repartition(col("year"), col("make"), col("model")) |
| 18 | + .write |
| 19 | + .format("parquet") |
| 20 | + .saveAsTable("/home/user/motdata")) |
| 21 | + |
| 22 | + df = (spark.read |
| 23 | + .parquet("/home/user/motdata") |
| 24 | + .filter("""test_type = 'NT' |
| 25 | + and test_result in ('F', 'P') """)) |
| 26 | + |
| 27 | + df = (df |
| 28 | + .drop("colour", "vehicle_id", "test_id", "test_date", "test_class_id", "test_type", "postcode_area", |
| 29 | + "first_use_date") |
| 30 | + .withColumn("indexed_test_mileage", expr("int(test_mileage)")) |
| 31 | + .withColumn("indexed_year", expr("int(year)")) |
| 32 | + .withColumn("indexed_cylinder_capacity", expr("int(cylinder_capacity)")) |
| 33 | + .filter("make = 'FORD'") |
| 34 | + .filter("model = 'FIESTA'")) |
| 35 | + |
| 36 | + string_cols = ["make", "model", "fuel_type"] |
| 37 | + feature_cols = ["test_mileage", "fuel_type", "year", "cylinder_capacity"] |
| 38 | + |
| 39 | + def get_string_indexer(c) -> StringIndexer: |
| 40 | + return (StringIndexer() |
| 41 | + .setInputCol(c) |
| 42 | + .setOutputCol(f"indexed_{c}")) |
| 43 | + |
| 44 | + for _col in feature_cols: |
| 45 | + df = df.filter(f"{_col} is not null") |
| 46 | + |
| 47 | + df = df.localCheckpoint(True) |
| 48 | + |
| 49 | + for _col in string_cols: |
| 50 | + indexer = get_string_indexer(_col) |
| 51 | + df = indexer.fit(df).transform(df) |
| 52 | + |
| 53 | + idx_feature_cols = [f"indexed_{c}" for c in feature_cols] |
| 54 | + features_assembler = (VectorAssembler() |
| 55 | + .setInputCols(idx_feature_cols) |
| 56 | + .setOutputCol("features")) |
| 57 | + |
| 58 | + with_features_vector_df = (features_assembler |
| 59 | + .transform(df) |
| 60 | + .drop(*idx_feature_cols) |
| 61 | + .localCheckpoint(True)) |
| 62 | + |
| 63 | + # Automatically identify categorical features, and index them. |
| 64 | + # Set maxCategories so features with > 4 distinct values are treated as continuous. |
| 65 | + feature_indexer = (VectorIndexer() |
| 66 | + .setInputCol("features") |
| 67 | + .setOutputCol("indexedFeatures") |
| 68 | + .setMaxCategories(4) |
| 69 | + .fit(with_features_vector_df)) |
| 70 | + |
| 71 | + # Split the data into training and test sets (30% held out for testing). |
| 72 | + (training_data, test_data) = with_features_vector_df.randomSplit([0.7, 0.3]) |
| 73 | + |
| 74 | + # Train a Gradient boost model. |
| 75 | + gb = (GBTClassifier() |
| 76 | + .setLabelCol("indexedTestResult") |
| 77 | + .setFeaturesCol("indexedFeatures") |
| 78 | + .setMaxIter(10) |
| 79 | + .setFeatureSubsetStrategy("auto")) |
| 80 | + |
| 81 | + label_indexer = (StringIndexer() |
| 82 | + .setInputCol("test_result") |
| 83 | + .setOutputCol("indexedTestResult") |
| 84 | + .fit(df)) |
| 85 | + |
| 86 | + # Convert indexed labels back to original labels. |
| 87 | + label_converter = (IndexToString() |
| 88 | + .setInputCol("prediction") |
| 89 | + .setOutputCol("predictedTestResult") |
| 90 | + .setLabels(list(label_indexer.labelsArray[0]))) |
| 91 | + |
| 92 | + # Chain indexers and forest in a Pipeline. |
| 93 | + pipeline = Pipeline().setStages([label_indexer, feature_indexer, gb, label_converter]) |
| 94 | + |
| 95 | + use_saved_model = False |
| 96 | + path = "/home/user/models/spark-rf/model" |
| 97 | + # Train model. This also runs the indexers. |
| 98 | + model: PipelineModel |
| 99 | + if use_saved_model: |
| 100 | + model = PipelineModel.load(path) |
| 101 | + else: |
| 102 | + model = pipeline.fit(training_data) |
| 103 | + model.write().overwrite().save(path) |
| 104 | + |
| 105 | + # Make predictions. |
| 106 | + predictions = model.transform(test_data) |
| 107 | + |
| 108 | + # Select example rows to display. |
| 109 | + cols = [*["predictedTestResult", "test_result"], *feature_cols] |
| 110 | + # predictions.selectExpr(cols: _*).show(1500) |
| 111 | + (predictions |
| 112 | + .repartition(1) |
| 113 | + .selectExpr(*cols) |
| 114 | + .write |
| 115 | + .mode("overwrite") |
| 116 | + .csv("/tmp/output.csv")) |
| 117 | + |
| 118 | + # Select (prediction, true label) and compute test error. |
| 119 | + evaluator = (MulticlassClassificationEvaluator() |
| 120 | + .setLabelCol("indexedTestResult") |
| 121 | + .setPredictionCol("prediction") |
| 122 | + .setMetricName("accuracy")) |
| 123 | + accuracy = evaluator.evaluate(predictions) |
| 124 | + print(f"Test Accuracy = {accuracy}") |
| 125 | + print(f"Test Error = {(1.0 - accuracy)}") |
| 126 | + |
| 127 | + |
| 128 | +if __name__ == "__main__": |
| 129 | + train_and_predict() |
0 commit comments