Getting started with Apache Spark GraphX – Part 1

This is two part series on Apache Spark GraphX. The second part is located here.

From DNA sequencing to anti-terrorism efforts, graph computation is increasingly ubiquitous in today’s world. Graph’s have vertices and edges. The vertices are the entities in the graph while edges are connections between the entities. Graphs can be symmetric or directional.

For example, in the Facebook social network, when two vertices are connected, it’s bi-directional. Meaning once you accept your mother’s friend’s friend request, the connection is in both directions. Meanwhile, Twitter has the concept of “followers” as opposed to friends. The Twitter graph is a directional graph, as anyone can follow you without you following them. Anyone with a twitter account likely follows at least one famous writer, athlete or celebrity, but it’s not likely they follow you back.

Graphs have been traversed for fun, profit, and the advancement of science since at least 1736 when Euler laid out the solution to the famous Seven Bridges of Königsberg problem. However, they remain one of the most active research areas in both applied and basic research as traversal of non-trivial graphs continues to be challenging.

One of the most profitable business uses of graph traversal was the invention of PageRank, by Google’s Sergey Brin and Larry Page while Stanford graduate students. The idea, in retrospect, feels obvious. They noted that as academic papers typically cite other papers, they can be modeled as graph. In addition, the number and quality of papers which cite a given paper might indicate it’s importance. They then realized this same structure existed on the nascent world wide web.

At that time, search engines were focusing mostly on natural language processing to provide relevant results. Google added PageRank to search result scores and, as they say, the rest is history.

I apologize for the gloomy example, but I’ve found that this example clicks with many readers. It’s widely known that bosses in the criminal world, in order to reduce risk, use couriers to communicate with middle management. Additionally, intelligence agencies often collect what is known as “metadata” about text and phone calls. That is, they don’t collect the content of electronic communication, but the list of who calls who and when.

For example, they collect the fact that number 555-4212 texted 555-9421. Armed with this metadata and a small amount of knowledge of the criminal organization, law enforcement can gain useful insights using graph traversal.

For example, given the following metadata about phone communication, can you find the the most important nodes? If you are like me, although the list is trivial, it’s still hard to reason about.

555-4172 555-2086
555-2174 555-1682
555-2086 555-2174
555-1869 555-1682
555-1682 555-7353
555-2250 555-2174
555-2889 555-1869
555-4172 555-1682
555-2889 555-4172
555-1869 555-2250
555-4731 555-1682
555-1891 555-1869

Let us investigate. Apache Spark comes with a native graph traversal library, GraphX, which we’ll use in this example. Before we process this data with GraphX we have to transform the input file into the correct format. GraphX takes an “edge list file” in which each line is an edge and an edge is a space separated pair of vertices. Each vertex is represented by a 64-bit integer which should be unique to that vertex.

On string data, a 64-bit hash can be used to generate the vertex id. In our case, if we remove the dash from each number, the edge will be a unique integer. To perform this translation, we use the following Spark script:

var vertices = sc.textFile("metadata-raw.txt").
 flatMap { line => line.split("\s+") }.distinct()
vertices.map { vertex => vertex.replace("-", "") + "t" + vertex }.
 saveAsTextFile("metadata-lookup")
sc.textFile("metadata-raw.txt").map { line =>
 var fields = line.split("\s+")
 if (fields.length == 2) {
   fields(0).replace("-", "") + "t" + fields(1).replace("-", "")
 }
}.saveAsTextFile("metadata-processed")

This program outputs two directories, the edge list, which we named “metadata-processed”:

5554172 5552086
5552174 5551682
5552086 5552174
5551869 5551682
5551682 5557353
5552250 5552174
5552889 5551869
5554172 5551682
5552889 5554172
5551869 5552250
5554731 5551682
5551891 5551869

The vertex “5554172” is obviously “555-4172” but in real-world scenarios the vertex id will be opaque. As such, we need a “lookup” or dictionary file to make use of the results of pagerank. As such we also generate a “metadata-lookup” file, which looks as follows:

5552174 555-2174
5552086 555-2086
5554172 555-4172
5551869 555-1869
5557353 555-7353
5551682 555-1682
5552250 555-2250
5551891 555-1891
5552889 555-2889
5554731 555-4731

We can join the results against this lookup file to get the original vertex back after running pagerank. Now we have everything we need to run pagerank. Below is our program to run pagerank:

import org.apache.spark.graphx._
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "metadata-processed")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// join the ids with the phone numbers
val entities = sc.textFile("metadata-lookup").map { line =>
 val fields = line.split("\s+")
 (fields(0).toLong, fields(1))
}
val ranksByVertex = entities.join(ranks).map {
 case (id, (vertex, rank)) => (rank, vertex)
}
// print out the top 5 entities
println(ranksByVertex.sortByKey(false).take(5).mkString("n"))

We won’t go into the details of pagerank as it’s covered in depth elsewhere. However, you are likely asking what does 0.0001 mean? We are using the version of pagerank which will run until the ranks converge and 0.0001 is our threshold for convergence. After executing pagerank, we join the results against our dictionary file, sort the results by the rank, and output the first five results. For those new to Scala, you might be curious about the use of case in the aforementioned join. This is a simple and compact way to get nice variable names. That case will match all rows and name our variables nicely. One, less nice, alternative is:

val ranksByEntityName = entities.join(ranks).map {
  row => (row._2._2, row._2._1)
}

The pagerank score is on the left and the phone number in question is on the right:

(1.0280446874999998,555-1682)
(1.0238379843749996,555-7353)
(0.6054937499999999,555-2174)
(0.34124999999999994,555-1869)
(0.29503124999999997,555-2250)

The vertex 555-1682 has the highest rank. When traversing the web and research papers, the highest rank gives some indication of importance and the same is true here. However, as discussed above, criminal networks are organized differently than the web or academic research.

Note that 555-7353 has a high pagerank, despite only a single edge in the edge list. This is because the vertex it communicates with, 555-1682, is extremely important in this. The logic is the same as the traditional use of pagerank. If a “credible” academic paper links to a page, that the linked to paper must also be credible. In a criminal network, this might mean that 555-1682 handles communication for 555-7353. Thus indicating 555-7353 is the boss.