1+ package com .baeldung .data .pipeline ;
2+
3+ import static com .datastax .spark .connector .japi .CassandraJavaUtil .javaFunctions ;
4+ import static com .datastax .spark .connector .japi .CassandraJavaUtil .mapToRow ;
5+
6+ import java .util .Arrays ;
7+ import java .util .Collection ;
8+ import java .util .HashMap ;
9+ import java .util .Iterator ;
10+ import java .util .List ;
11+ import java .util .Map ;
12+
13+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
14+ import org .apache .kafka .common .serialization .StringDeserializer ;
15+ import org .apache .log4j .Level ;
16+ import org .apache .log4j .Logger ;
17+ import org .apache .spark .SparkConf ;
18+ import org .apache .spark .api .java .JavaPairRDD ;
19+ import org .apache .spark .api .java .JavaRDD ;
20+ import org .apache .spark .api .java .JavaSparkContext ;
21+ import org .apache .spark .api .java .Optional ;
22+ import org .apache .spark .api .java .function .FlatMapFunction ;
23+ import org .apache .spark .api .java .function .Function ;
24+ import org .apache .spark .api .java .function .Function2 ;
25+ import org .apache .spark .api .java .function .Function3 ;
26+ import org .apache .spark .api .java .function .PairFunction ;
27+ import org .apache .spark .api .java .function .VoidFunction ;
28+ import org .apache .spark .streaming .Durations ;
29+ import org .apache .spark .streaming .State ;
30+ import org .apache .spark .streaming .StateSpec ;
31+ import org .apache .spark .streaming .api .java .JavaDStream ;
32+ import org .apache .spark .streaming .api .java .JavaInputDStream ;
33+ import org .apache .spark .streaming .api .java .JavaMapWithStateDStream ;
34+ import org .apache .spark .streaming .api .java .JavaPairDStream ;
35+ import org .apache .spark .streaming .api .java .JavaStreamingContext ;
36+ import org .apache .spark .streaming .kafka010 .ConsumerStrategies ;
37+ import org .apache .spark .streaming .kafka010 .KafkaUtils ;
38+ import org .apache .spark .streaming .kafka010 .LocationStrategies ;
39+
40+ import scala .Tuple2 ;
41+
42+ public class WordCountingAppWithCheckpoint {
43+
44+ public static JavaSparkContext sparkContext ;
45+
46+ @ SuppressWarnings ("serial" )
47+ public static void main (String [] args ) throws InterruptedException {
48+
49+ Logger .getLogger ("org" )
50+ .setLevel (Level .OFF );
51+ Logger .getLogger ("akka" )
52+ .setLevel (Level .OFF );
53+
54+ Map <String , Object > kafkaParams = new HashMap <>();
55+ kafkaParams .put ("bootstrap.servers" , "localhost:9092" );
56+ kafkaParams .put ("key.deserializer" , StringDeserializer .class );
57+ kafkaParams .put ("value.deserializer" , StringDeserializer .class );
58+ kafkaParams .put ("group.id" , "use_a_separate_group_id_for_each_stream" );
59+ kafkaParams .put ("auto.offset.reset" , "latest" );
60+ kafkaParams .put ("enable.auto.commit" , false );
61+
62+ Collection <String > topics = Arrays .asList ("messages" );
63+
64+ SparkConf sparkConf = new SparkConf ();
65+ sparkConf .setMaster ("local[2]" );
66+ sparkConf .setAppName ("WordCountingAppWithCheckpoint" );
67+ sparkConf .set ("spark.cassandra.connection.host" , "127.0.0.1" );
68+
69+ JavaStreamingContext streamingContext = new JavaStreamingContext (sparkConf , Durations .seconds (1 ));
70+
71+ sparkContext = streamingContext .sparkContext ();
72+
73+ streamingContext .checkpoint ("./.checkpoint" );
74+
75+ JavaInputDStream <ConsumerRecord <String , String >> messages = KafkaUtils .createDirectStream (streamingContext , LocationStrategies .PreferConsistent (), ConsumerStrategies .<String , String > Subscribe (topics , kafkaParams ));
76+
77+ JavaPairDStream <String , String > results = messages .mapToPair (new PairFunction <ConsumerRecord <String , String >, String , String >() {
78+ @ Override
79+ public Tuple2 <String , String > call (ConsumerRecord <String , String > record ) {
80+ return new Tuple2 <>(record .key (), record .value ());
81+ }
82+ });
83+
84+ JavaDStream <String > lines = results .map (new Function <Tuple2 <String , String >, String >() {
85+ @ Override
86+ public String call (Tuple2 <String , String > tuple2 ) {
87+ return tuple2 ._2 ();
88+ }
89+ });
90+
91+ JavaDStream <String > words = lines .flatMap (new FlatMapFunction <String , String >() {
92+ @ Override
93+ public Iterator <String > call (String x ) {
94+ return Arrays .asList (x .split ("\\ s+" ))
95+ .iterator ();
96+ }
97+ });
98+
99+ JavaPairDStream <String , Integer > wordCounts = words .mapToPair (new PairFunction <String , String , Integer >() {
100+ @ Override
101+ public Tuple2 <String , Integer > call (String s ) {
102+ return new Tuple2 <>(s , 1 );
103+ }
104+ })
105+ .reduceByKey (new Function2 <Integer , Integer , Integer >() {
106+ @ Override
107+ public Integer call (Integer i1 , Integer i2 ) {
108+ return i1 + i2 ;
109+ }
110+ });
111+
112+ Function3 <String , Optional <Integer >, State <Integer >, Tuple2 <String , Integer >> mappingFunc = (word , one , state ) -> {
113+ int sum = one .orElse (0 ) + (state .exists () ? state .get () : 0 );
114+ Tuple2 <String , Integer > output = new Tuple2 <>(word , sum );
115+ state .update (sum );
116+ return output ;
117+ };
118+
119+ JavaPairRDD <String , Integer > initialRDD = JavaPairRDD .fromJavaRDD (sparkContext .emptyRDD ());
120+
121+ JavaMapWithStateDStream <String , Integer , Integer , Tuple2 <String , Integer >> cumulativeWordCounts = wordCounts .mapWithState (StateSpec .function (mappingFunc )
122+ .initialState (initialRDD ));
123+
124+ cumulativeWordCounts .foreachRDD (new VoidFunction <JavaRDD <Tuple2 <String , Integer >>>() {
125+ @ Override
126+ public void call (JavaRDD <Tuple2 <String , Integer >> javaRdd ) throws Exception {
127+ List <Tuple2 <String , Integer >> wordCountList = javaRdd .collect ();
128+ for (Tuple2 <String , Integer > tuple : wordCountList ) {
129+ List <Word > words = Arrays .asList (new Word (tuple ._1 , tuple ._2 ));
130+ JavaRDD <Word > rdd = sparkContext .parallelize (words );
131+ javaFunctions (rdd ).writerBuilder ("vocabulary" , "words" , mapToRow (Word .class ))
132+ .saveToCassandra ();
133+ }
134+ }
135+ });
136+
137+ streamingContext .start ();
138+ streamingContext .awaitTermination ();
139+ }
140+ }
0 commit comments