Skip to content

Commit 5bdd92a

Browse files
Changes related to Apache Spark GraphX article
1 parent 68b080e commit 5bdd92a

6 files changed

Lines changed: 257 additions & 0 deletions

File tree

apache-spark/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@
4040
<version>${org.apache.spark.spark-mllib.version}</version>
4141
<scope>provided</scope>
4242
</dependency>
43+
<dependency>
44+
<groupId>org.apache.spark</groupId>
45+
<artifactId>spark-graphx_2.12</artifactId>
46+
<version>${org.apache.spark.spark-mllib.version}</version>
47+
<scope>provided</scope>
48+
</dependency>
4349
<dependency>
4450
<groupId>org.apache.spark</groupId>
4551
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.baeldung.graphx;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import org.apache.log4j.Level;
7+
import org.apache.log4j.Logger;
8+
import org.apache.spark.api.java.function.VoidFunction;
9+
import org.apache.spark.graphx.Graph;
10+
import org.apache.spark.graphx.VertexRDD;
11+
import org.apache.spark.graphx.lib.PageRank;
12+
13+
import scala.Tuple2;
14+
15+
public class GraphAlgorithms {
16+
public static Map<Long, User> USERS = new HashMap<>();
17+
18+
public static void main(String[] args) {
19+
Logger.getLogger("org").setLevel(Level.OFF);
20+
21+
GraphLoader loader = new GraphLoader();
22+
Graph<User, Relationship> graph = loader.mapUserRelationship();
23+
24+
Graph<Object, Object> pageRank = PageRank.run(graph, 20, 0.0001, GraphLoader.USER_TAG,
25+
GraphLoader.RELATIONSHIP_TAG);
26+
27+
VertexRDD<Object> usersRDD = pageRank.vertices();
28+
29+
System.out.println("---- PageRank: ");
30+
System.out.println("- Users Ranked ");
31+
usersRDD.toJavaRDD()
32+
.foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out.println(tuple.toString()));
33+
34+
System.out.println("---- Connected Components: ");
35+
Graph<Object, Relationship> connectedComponents = graph.ops().connectedComponents();
36+
37+
connectedComponents.vertices().toJavaRDD()
38+
.foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out.println(tuple.toString()));
39+
40+
System.out.println("---- Triangle Count: ");
41+
Graph<Object, Relationship> triangleCount = graph.ops().triangleCount();
42+
43+
triangleCount.vertices().toJavaRDD()
44+
.foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out.println(tuple.toString()));
45+
}
46+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.baeldung.graphx;
2+
3+
import java.io.Serializable;
4+
import java.util.ArrayList;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
import org.apache.spark.SparkConf;
10+
import org.apache.spark.api.java.JavaRDD;
11+
import org.apache.spark.api.java.JavaSparkContext;
12+
import org.apache.spark.graphx.Edge;
13+
import org.apache.spark.graphx.Graph;
14+
import org.apache.spark.storage.StorageLevel;
15+
16+
import scala.Function1;
17+
import scala.Function2;
18+
import scala.Predef;
19+
import scala.reflect.ClassTag;
20+
import scala.reflect.ClassTag$;
21+
22+
public class GraphLoader {
23+
24+
public static Map<Long, User> USERS = new HashMap<>();
25+
public static ClassTag<Relationship> RELATIONSHIP_TAG = ClassTag$.MODULE$.apply(Relationship.class);
26+
public static ClassTag<User> USER_TAG = ClassTag$.MODULE$.apply(User.class);
27+
28+
public JavaSparkContext getSparkContext() {
29+
SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]");
30+
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
31+
return javaSparkContext;
32+
}
33+
34+
public Graph<User, Relationship> mapUserRelationship() {
35+
JavaSparkContext javaSparkContext = getSparkContext();
36+
37+
List<Edge<String>> edges = getEdges();
38+
39+
JavaRDD<Edge<String>> edgeJavaRDD = javaSparkContext.parallelize(edges);
40+
41+
ClassTag<String> stringTag = ClassTag$.MODULE$.apply(String.class);
42+
43+
Graph<String, String> graph = Graph.fromEdges(edgeJavaRDD.rdd(), "Following", StorageLevel.MEMORY_ONLY(),
44+
StorageLevel.MEMORY_ONLY(), stringTag, stringTag);
45+
46+
Graph<String, Relationship> relationshipGraph = graph.mapEdges(new MapRelationship(), RELATIONSHIP_TAG);
47+
Predef.$eq$colon$eq<String, User> eq = null;
48+
49+
return relationshipGraph.mapVertices(new MapUser(), USER_TAG, eq);
50+
}
51+
52+
public List<Edge<String>> getEdges() {
53+
List<Edge<String>> edges = new ArrayList<>();
54+
edges.add(new Edge<>(1L, 2L, "Friend"));
55+
edges.add(new Edge<>(1L, 4L, "Following"));
56+
edges.add(new Edge<>(2L, 4L, "Friend"));
57+
edges.add(new Edge<>(3L, 1L, "Relative"));
58+
edges.add(new Edge<>(3L, 4L, "Relative"));
59+
60+
return edges;
61+
}
62+
63+
public Map<Long, User> getUsers() {
64+
if (USERS.isEmpty()) {
65+
loadUsers();
66+
}
67+
68+
return USERS;
69+
}
70+
71+
private void loadUsers() {
72+
User john = new User(1L, "John");
73+
User martin = new User(2L, "Martin");
74+
User peter = new User(3L, "Peter");
75+
User alicia = new User(4L, "Alicia");
76+
77+
USERS.put(1L, john);
78+
USERS.put(2L, martin);
79+
USERS.put(3L, peter);
80+
USERS.put(4L, alicia);
81+
}
82+
83+
private static class MapRelationship implements Function1<Edge<String>, Relationship>, Serializable {
84+
85+
@Override
86+
public Relationship apply(Edge<String> edge) {
87+
return new Relationship(edge.attr, new GraphLoader().getUsers().get(edge.srcId()), USERS.get(edge.dstId()));
88+
}
89+
}
90+
91+
private static class MapUser implements Function2<Object, String, User>, Serializable {
92+
@Override
93+
public User apply(Object id, String name) {
94+
return new GraphLoader().getUsers().get((Long) id);
95+
}
96+
}
97+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.baeldung.graphx;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import org.apache.log4j.Level;
7+
import org.apache.log4j.Logger;
8+
import org.apache.spark.api.java.function.VoidFunction;
9+
import org.apache.spark.graphx.Edge;
10+
import org.apache.spark.graphx.Graph;
11+
import org.apache.spark.graphx.VertexRDD;
12+
13+
import scala.Tuple2;
14+
15+
public class GraphOperations {
16+
public static Map<Long, User> USERS = new HashMap<>();
17+
18+
public static void main(String[] args) {
19+
Logger.getLogger("org").setLevel(Level.OFF);
20+
GraphOperations operations = new GraphOperations();
21+
operations.doOperations();
22+
}
23+
24+
private void doOperations() {
25+
GraphLoader loader = new GraphLoader();
26+
Graph<User, Relationship> userGraph = loader.mapUserRelationship();
27+
28+
System.out.println("Mapped Users: ");
29+
userGraph.vertices().toJavaRDD().foreach((VoidFunction<Tuple2<Object, User>>) tuple -> System.out
30+
.println("id: " + tuple._1 + " name: " + tuple._2));
31+
32+
System.out.println("Mapped Relationships: ");
33+
userGraph.edges().toJavaRDD()
34+
.foreach((VoidFunction<Edge<Relationship>>) edge -> System.out.println(edge.attr().toString()));
35+
36+
VertexRDD<Object> degreesVerticesRDD = userGraph.ops().degrees();
37+
VertexRDD<Object> inDegreesVerticesRDD = userGraph.ops().inDegrees();
38+
VertexRDD<Object> outDegreesVerticesRDD = userGraph.ops().outDegrees();
39+
40+
System.out.println("degrees: ");
41+
degreesVerticesRDD.toJavaRDD().foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out
42+
.println("id: " + tuple._1 + " count: " + tuple._2));
43+
44+
System.out.println("inDegrees: ");
45+
inDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out
46+
.println("id: " + tuple._1 + " count: " + tuple._2));
47+
48+
System.out.println("outDegrees: ");
49+
outDegreesVerticesRDD.toJavaRDD().foreach((VoidFunction<Tuple2<Object, Object>>) tuple -> System.out
50+
.println("id: " + tuple._1 + " count: " + tuple._2));
51+
}
52+
53+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.baeldung.graphx;
2+
3+
public class Relationship {
4+
private String type;
5+
private User source;
6+
private User target;
7+
8+
public Relationship(String type, User source, User target) {
9+
this.type = type;
10+
this.source = source;
11+
this.target = target;
12+
}
13+
14+
public String getType() {
15+
return type;
16+
}
17+
18+
public User getSource() {
19+
return source;
20+
}
21+
22+
public User getTarget() {
23+
return target;
24+
}
25+
26+
@Override
27+
public String toString() {
28+
return getSource().toString() + " -- " + getType() + " --> " + getTarget().toString();
29+
}
30+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.baeldung.graphx;
2+
3+
public class User {
4+
5+
private Long id;
6+
private String name;
7+
8+
public User(long id, String name) {
9+
this.id = id;
10+
this.name = name;
11+
}
12+
13+
public Long getId() {
14+
return id;
15+
}
16+
17+
public String getName() {
18+
return name;
19+
}
20+
21+
@Override
22+
public String toString() {
23+
return "(" + id + "-" + name + ")";
24+
}
25+
}

0 commit comments

Comments
 (0)