-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathTopK_RecommenderGenerator.java
More file actions
106 lines (84 loc) · 3.14 KB
/
TopK_RecommenderGenerator.java
File metadata and controls
106 lines (84 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import java.io.BufferedReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class TopK_RecommenderGenerator {
private static Map<Integer, PriorityQueue<String>> recommenderMap = new HashMap<>();
/***
* Driver
* Find top k movie to recommend to user
* Output: user[i]: movie
***/
public static void TopK_RecommenderGenerator_driver(String recommenderListPath, String recommenderResultPath, int k) throws Exception {
Path rootPath = new Path(recommenderListPath);
FileSystem fs = FileSystem.get(new Configuration());
List<Path> filePaths = getFilesUnderFolder(fs, rootPath);
for(Path path : filePaths) {
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
String line = br.readLine();
while (line != null) {
String[] tokens = line.toString().trim().split("\t");
int user = Integer.parseInt(tokens[0]);
if(recommenderMap.containsKey(user)) {
recommenderMap.get(user).add(tokens[1]);
if(recommenderMap.get(user).size() > k) {
recommenderMap.get(user).poll();
}
} else {
PriorityQueue<String> pq = new PriorityQueue<String>(k + 1, new myComparator());
pq.add(tokens[1]);
recommenderMap.put(user, pq);
}
line = br.readLine();
}
br.close();
}
FileWriter writer;
try {
writer = new FileWriter(recommenderResultPath);
for(Map.Entry<Integer, PriorityQueue<String>> entry : recommenderMap.entrySet()) {
int userId = entry.getKey();
PriorityQueue<String> pq = entry.getValue();
while(!pq.isEmpty()) {
StringBuilder sb = new StringBuilder();
String movie = pq.poll().split(":")[0];
sb.append("user[").append(userId).append("]: ").append(movie).append("\n");
writer.write(sb.toString());
writer.flush();
}
}
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static List<Path> getFilesUnderFolder(FileSystem fs, Path folderPath) throws IOException {
List<Path> paths = new ArrayList<Path>();
if (fs.exists(folderPath)) {
FileStatus[] fileStatus = fs.listStatus(folderPath);
for (int i = 0; i < fileStatus.length; i++) {
Path oneFilePath = fileStatus[i].getPath();
paths.add(oneFilePath);
}
}
return paths;
}
}
class myComparator implements Comparator<String> {
@Override
public int compare(String str1, String str2) {
String rating1 = str1.split(":")[1];
String rating2 = str2.split(":")[1];
return rating1.compareTo(rating2);
}
}