GraphApp.scala 3.08 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package no.uninett
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.SparkContext._

/**
 * Hello world!
 *
 */
object GraphApp 
{
    val DestPort = 6
    val SrcIP = 3
    val DestIP = 4

    def main(args: Array[String]):Unit={
        val appName = "scala test"
        val conf = new SparkConf().setAppName(appName).set("spark.mesos.coarse","true")
        val sc = new org.apache.spark.SparkContext(conf)
        val lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.csv")
        val csv = lines.map(x => x.split(",").map(x => x.trim))
        val tuples = csv.map(x => (x(SrcIP), x(DestIP), x(DestPort)))
        val v4only = tuples.filter(r => !r._1.contains(":") && !r._2.contains(":") && !r._3.contains("."))
        val ip2id = (ip:String) => ip.split("\\.").map(c => c.toLong).reduceLeft((a,b) => a * 256 + b)
        val id2ip = (id:Long) => s"${id>>24}.${(id>>16)&0xff}.${(id>>8)&0xff}.${id&0xff}"
        val edges = v4only.map(r=>new Edge(ip2id(r._1), ip2id(r._2), r._3.toInt)).distinct()
        val graph = Graph.fromEdges(edges, (0, Long.MaxValue))
        val httpgraph = graph.subgraph(edge => edge.attr == 53)
        val lengthgraph = httpgraph.pregel((0, Long.MaxValue), 10)(
            (id, attr, newAttr) => (math.max(newAttr._1, attr._1), math.min(newAttr._2, attr._2)),
            triplet => {
                if (triplet.srcAttr._2 == triplet.dstAttr._2 && triplet.srcAttr._2 < Long.MaxValue) { // break cycles
                    Iterator.empty
                }
                else if (triplet.srcAttr._1 + 1 <= triplet.dstAttr._1 && triplet.srcAttr._2 > triplet.dstAttr._2) {
                    Iterator((triplet.srcId, (triplet.srcAttr._1, triplet.dstAttr._2)))
                }
                else if (triplet.srcAttr._1 + 1 >= triplet.dstAttr._1) {
                    val minNode: Long = math.min(math.min(triplet.srcAttr._2, triplet.srcId), triplet.dstAttr._2)
                    Iterator((triplet.dstId, (triplet.srcAttr._1 + 1, minNode)))
                } else {
                    Iterator.empty
                }
            },
            (a, b) => (math.max(a._1, b._1), math.min(a._2, b._2))
        )
        lengthgraph.cache()
        val subgraphs = lengthgraph.vertices.filter(v => v._2._1 > 1).map(v => v._2._2).distinct().collect()
        lengthgraph.subgraph(vpred = (id, attr) => subgraphs.contains(attr._2)).triplets.filter(triplet => triplet.srcAttr._1 + 1 == triplet.dstAttr._1).groupBy(triplet => triplet.dstAttr._2).map(
            group => {
                val sb = new StringBuilder
                val sorted = group._2.toArray.sortBy(triplet => triplet.srcAttr._1)
                sorted.map(triplet => triplet.srcAttr._1 + " " + triplet.dstAttr._1 + " " + triplet.srcAttr._2 + " " + triplet.dstAttr._2 + " " + id2ip(triplet.srcId) + " " + id2ip(triplet.dstId) + " " + triplet.attr).addString(sb, "cluster:\n  ", "\n  ", "\n")
                sb
            }
        ).collect.foreach(println(_))
        //lengthgraph.vertices.map(v => (v._2, id2ip(v._1))).sortByKey(false).take(15).foreach(a => println(s"${a._1} ${a._2}"))
    }
}