GraphApp.scala 2.36 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package no.uninett
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.SparkContext._

/**
 * Hello world!
 *
 */
object GraphApp 
{
    val DestPort = 6
    val SrcIP = 3
    val DestIP = 4

    def main(args: Array[String]):Unit={
        val appName = "scala test"
        val conf = new SparkConf().setAppName(appName).set("spark.mesos.coarse","true")
        val sc = new org.apache.spark.SparkContext(conf)
        val lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.csv")
        val csv = lines.map(x => x.split(",").map(x => x.trim))
        val tuples = csv.map(x => (x(SrcIP), x(DestIP), x(DestPort)))
        val v4only = tuples.filter(r => !r._1.contains(":") && !r._2.contains(":") && !r._3.contains("."))
        val ip2id = (ip:String) => ip.split("\\.").map(c => c.toLong).reduceLeft((a,b) => a * 256 + b)
        val id2ip = (id:Long) => s"${id>>24}.${(id>>16)&0xff}.${(id>>8)&0xff}.${id&0xff}"
        val edges = v4only.map(r=>new Edge(ip2id(r._1), ip2id(r._2), r._3.toInt)).distinct()
27
28
29
30
        val graph = Graph.fromEdges(edges, Array[Long]())
        val httpgraph = graph.subgraph(edge => edge.attr == 179)
        val lengthgraph = httpgraph.pregel(Array[Long](), 75)(
            (id, attr, newAttr) => newAttr :+ id,
31
            triplet => {
32
33
                if (triplet.srcAttr.length >= triplet.dstAttr.length && !triplet.srcAttr.contains(triplet.dstId)) {
                    Iterator((triplet.dstId, triplet.srcAttr))
34
35
36
37
                } else {
                    Iterator.empty
                }
            },
38
39
40
41
42
43
44
45
46
47
48
            (a, b) => {
                if (a.length > b.length) {
                    a
                } else if (b.length > a.length){
                    b
                } else if (a(0) < b(0)) {
                    a
                } else {
                    b
                }
            }
49
50
        )
        lengthgraph.cache()
51
52
53
54
        val candidates = lengthgraph.vertices.filter(v => v._2.length > 2)
        val results = candidates.map(v => (v._2(0), v._2)).reduceByKey((a, b) => { if (a.length > b.length) { a } else { b } }).map(a=>a._2)
        results.map(
            path => {
55
                val sb = new StringBuilder
56
                path.map(id => id2ip(id)).addString(sb, "Chain length " + path.length + ": ", " -> ", "")
57
58
59
60
61
                sb
            }
        ).collect.foreach(println(_))
    }
}