-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathCoOccurenceMatrixGenetator.java
More file actions
82 lines (68 loc) · 2.55 KB
/
CoOccurenceMatrixGenetator.java
File metadata and controls
82 lines (68 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class CoOccurenceMatrixGenetator {
/***
* map method
* Input: userId \t movie1:rating, movie2:rating...
* Output: key = movie1 : movie2, value = 1...
*
***/
public static class MatrixGeneratorMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString().trim();
String[] user_movieRatings = line.split("\t");
String user = user_movieRatings[0];
String[] movie_ratings = user_movieRatings[1].split(",");
for (int i = 0; i < movie_ratings.length; i++) {
String movie1 = movie_ratings[i].trim().split(":")[0];
for (int j = 0; j < movie_ratings.length; j++) {
String movie2 = movie_ratings[j].trim().split(":")[0];
context.write(new Text(movie1 + ":" + movie2), new IntWritable(1));
}
}
}
}
/***
* reduce method
* Input key = movie1:movie2, value = iterable<1,1,1>
*
***/
public static class MatrixGeneratorReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
while (values.iterator().hasNext()) {
sum += Integer.parseInt(values.iterator().next().toString());
}
context.write(key, new IntWritable(sum));
}
}
/***
* Driver
*
***/
public void CoOccurenceMatrixGenetator_driver(String input, String output) throws Exception {
Job job = Job.getInstance();
job.setMapperClass(MatrixGeneratorMapper.class);
job.setReducerClass(MatrixGeneratorReducer.class);
job.setJarByClass(CoOccurenceMatrixGenetator.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
TextInputFormat.setInputPaths(job, new Path(input));
TextOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
}
}