-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathWordCount.java
More file actions
76 lines (51 loc) · 2.21 KB
/
WordCount.java
File metadata and controls
76 lines (51 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.example;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
/*
* Learning Objectives:
* 1. "Hello World" of Spark application using Java Lambda expression
* 2. Specify JDK version 1.8 in Maven to support Lambda expression
*
*
* */
public final class WordCount {
private static final Pattern SPACE = Pattern.compile("\\W+");
private static Logger logger = LoggerFactory.getLogger(WordCount.class);
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: com.example.WordCountLambda <file> [<output path>]");
System.exit(1);
}
logger.info("Input path: " + args[0]);
SparkConf sparkConf = new SparkConf().setAppName("WordCount (using Java Lambda)");
//Allow spark to overwrite the output on an existing directory
sparkConf.set("spark.hadoop.validateOutputSpecs", "false");
//If master is not passed, Spark application will set the master to local[*]
sparkConf.setIfMissing("spark.master", "local[*]");
logger.info("Creating the spark context");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0], 1);
JavaRDD<String> words = lines.flatMap((s) -> Arrays.asList(SPACE.split(s.toLowerCase())).iterator());
JavaPairRDD<String, Integer> ones = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<String,Integer> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
if(args.length >=2) {
logger.info("Saving the output to: " + args[1]);
counts.saveAsTextFile(args[1]);
}
sc.stop();
sc.close();
System.out.println("Process is complete.");
}
}