-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathUserClickCountAnalytics.scala
More file actions
69 lines (69 loc) · 2.24 KB
/
UserClickCountAnalytics.scala
File metadata and controls
69 lines (69 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//package org.training.spark.streaming
//
//import org.apache.spark.SparkConf
//import org.apache.spark.streaming.Seconds
//import org.apache.spark.streaming.StreamingContext
//import org.apache.spark.streaming.kafka.KafkaUtils
//import kafka.serializer.StringDecoder
//import com.alibaba.fastjson.JSON
//import org.apache.kafka.clients.consumer.ConsumerRecord
//import org.training.spark.util.{KafkaRedisProperties, RedisClient}
//
//object UserClickCountAnalytics {
// def main(args: Array[String]): Unit = {
// val conf = new SparkConf().setAppName("UserClickCountAnalytics")
// if (args.length == 0) {
// conf.setMaster("local[1]")
// }
//
// val ssc = new StreamingContext(conf, Seconds(5))
//
// // Kafka configurations
// val topics = KafkaRedisProperties.KAFKA_USER_TOPIC.split("\\,").toSet
// println(s"Topics: ${topics}.")
//
// val brokers = KafkaRedisProperties.KAFKA_ADDR
// val kafkaParams = Map[String, String](
// "metadata.broker.list" -> brokers,
// "serializer.class" -> "kafka.serializer.StringEncoder"
// )
//
// val clickHashKey = "app::users::click"
//
// // Create a direct stream
// val kafkaStream = KafkaUtils
// .createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//
// val events = kafkaStream.flatMap(line => {
// println(s"Line ${line}.")
// val data = JSON.parseObject(line._2)
// Some(data)
// })
//
// // Compute user click times
// val userClicks = events.map(x => (x.getString("uid"), x.getLong("click_count"))).reduceByKey(_ + _)
// userClicks.foreachRDD(rdd => {
// rdd.foreachPartition(partitionOfRecords => {
// val jedis = RedisClient.pool.getResource
// partitionOfRecords.foreach(pair => {
// try {
// val uid = pair._1
// val clickCount = pair._2
// jedis.hincrBy(clickHashKey, uid, clickCount)
// println(s"Update uid ${uid} to ${clickCount}.")
// } catch {
// case e: Exception => println("error:" + e)
// }
// })
// // destroy jedis object, please notice pool.returnResource is deprecated
// jedis.close()
// })
// })
//
// ssc.start()
// ssc.awaitTermination()
// }
//
//
//
//}