Finding the super hero by given data sets in Scala:
*scala> val names = sc.textFile(“C:\\SparkScala\\Marvel-names.txt”)
names: org.apache.spark.rdd.RDD[String] = C:\SparkScala\Marvel-names.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> def parsenames(line: String) : Option[(Int, String)] = {
| var fields = line.split(‘\”‘)
| if (fields.length > 1)Display all 668 possibilities? (y or n)
| if (fields.length > 1){
| return Some(fields(0).trim().toInt,fields(1)) }
| else {
| return None
| } }
parsenames: (line: String)Option[(Int, String)]
scala> val namesrdd = names.flatMap(parsenames)
namesrdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[2] at flatMap at <console>:27
scala> namesrdd.glom.collect()
res0: Array[Array[(Int, String)]] = Array(Array((1,24-HOUR MAN/EMMANUEL), (2,3-D MAN/CHARLES CHAN), (3,4-D MAN/MERCURIO), (4,8-BALL/), (5,A), (6,A’YIN), (7,ABBOTT, JACK), (8,ABCISSA), (9,ABEL), (10,ABOMINATION/EMIL BLO), (11,ABOMINATION | MUTANT), (12,ABOMINATRIX), (13,ABRAXAS), (14,ADAM 3,031), (15,ABSALOM), (16,ABSORBING MAN/CARL C), (17,ABSORBING MAN | MUTA), (18,ACBA), (19,ACHEBE, REVEREND DOC), (20,ACHILLES), (21,ACHILLES II/HELMUT), (22,ACROBAT/CARL ZANTE), (23,ADAM X), (24,ADAMS, CINDY), (25,ADAMS, CONGRESSMAN H), (26,ADAMS, GEORGE), (27,ADAMS, MARTHA), (28,ADAMS, NICOLE NIKKI), (29,ADAMSON, JASON), (30,ADAMSON, REBECCA), (31,ADMIRAL PROTOCOL/), (32,ADORA), (33,ADORA CLONE), (34,ADRIA), (35,ADVA), (36,ADVENT/KYLE GROBE), (37,ADVERSARY), (38,AEGIS/TREY ROLLINS), (39,AENTAROS), (40…
scala> val lines = sc.textFile(“C:\\SparkScala\\Marvel-graph.txt”)
lines: org.apache.spark.rdd.RDD[String] = C:\SparkScala\Marvel-graph.txt MapPartitionsRDD[5] at textFile at <console>:24
scala> def countcon(line:String) = {
| var elements =line.split(“\\s+”)
| ( elements(0).toInt, elements.length – 1 )
| }
countcon: (line: String)(Int, Int)
scala> val pairings = lines.map(countcon)
pairings: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[6] at map at <console>:27
scala> pairings.glom.collect()
res1: Array[Array[(Int, Int)]] = Array(Array((5988,48), (5989,40), (5982,42), (5983,14), (5980,24), (5981,17), (5986,142), (5987,81), (5984,41), (5985,19), (6294,13), (270,42), (271,9), (272,45), (273,58), (274,410), (275,47), (276,15), (277,16), (278,123), (279,31), (2143,19), (2142,222), (3519,12), (3518,1), (3513,33), (3512,11), (3511,56), (3510,19), (3517,13), (3516,11), (3515,200), (3514,29), (2688,39), (2689,330), (2684,18), (2685,24), (2686,7), (2687,9), (2680,49), (2681,14), (2682,18), (2683,21), (99,118), (98,18), (91,85), (90,11), (93,38), (92,19), (95,6), (94,59), (97,15), (96,21), (1177,206), (1176,13), (1175,126), (1174,4), (1173,8), (1172,65), (1171,2), (1170,6), (1179,35), (1178,70), (876,142), (877,10), (3432,13), (623,8), (875,501), (875,31), (622,17), (872,51), (1225,2…
scala> val totalfriendsbychar = pairings.reduceByKey( (x,y) => x+y)
totalfriendsbychar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[8] at reduceByKey at <console>:25
scala> totalfriendsbychar.glom.collect()
res3: Array[Array[(Int, Int)]] = Array(Array((4904,68), (1084,263), (384,21), (6400,15), (3702,17), (6308,89), (5618,19), (5354,88), (1894,8), (4926,11), (5384,17), (140,7), (204,36), (956,41), (5928,5), (4992,5), (3706,17), (2334,27), (6028,24), (4260,5), (5638,9), (2506,393), (4224,4), (450,16), (160,9), (1596,48), (1780,39), (5136,26), (4582,14), (3932,32), (2346,10), (2904,56), (4628,25), (1500,11), (4724,9), (548,15), (1732,100), (2602,124), (528,171), (1550,5), (6006,210), (2938,24), (1716,15), (2912,4), (1310,94), (1202,32), (2192,22), (2718,24), (196,8), (2706,7), (3118,21), (5632,9), (5090,4), (3296,239), (3568,5), (5204,44), (1144,29), (1670,33), (4228,382), (4304,33), (5506,76), (5736,1289), (4156,24), (5898,7), (5648,55), (4424,8), (2180,1), (266,8), (1344,12), (5746,20), (2…
scala> val flipped = totalfriendsbychar.map(x=> (x._2,x._1))
flipped: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[11] at map at <console>:25
scala> val mostpopular = flipped.max()
mostpopular: (Int, Int) = (1933,859)
scala> val mostpopularname = namesrdd.lookup(mostpopular._2)(0)
mostpopularname: String = CAPTAIN AMERICA