package no.uninett import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.SparkContext._ /** * Hello world! * */ object GraphApp { val DestPort = 6 val SrcIP = 3 val DestIP = 4 def main(args: Array[String]):Unit={ val appName = "scala test" val conf = new SparkConf().setAppName(appName).set("spark.mesos.coarse","true") val sc = new org.apache.spark.SparkContext(conf) val lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.csv") val csv = lines.map(x => x.split(",").map(x => x.trim)) val tuples = csv.map(x => (x(SrcIP), x(DestIP), x(DestPort))) val v4only = tuples.filter(r => !r._1.contains(":") && !r._2.contains(":") && !r._3.contains(".")) val ip2id = (ip:String) => ip.split("\\.").map(c => c.toLong).reduceLeft((a,b) => a * 256 + b) val id2ip = (id:Long) => s"${id>>24}.${(id>>16)&0xff}.${(id>>8)&0xff}.${id&0xff}" val edges = v4only.map(r=>new Edge(ip2id(r._1), ip2id(r._2), r._3.toInt)).distinct() val graph = Graph.fromEdges(edges, (0, Long.MaxValue)) val httpgraph = graph.subgraph(edge => edge.attr == 53) val lengthgraph = httpgraph.pregel((0, Long.MaxValue), 10)( (id, attr, newAttr) => (math.max(newAttr._1, attr._1), math.min(newAttr._2, attr._2)), triplet => { if (triplet.srcAttr._2 == triplet.dstAttr._2 && triplet.srcAttr._2 < Long.MaxValue) { // break cycles Iterator.empty } else if (triplet.srcAttr._1 + 1 <= triplet.dstAttr._1 && triplet.srcAttr._2 > triplet.dstAttr._2) { Iterator((triplet.srcId, (triplet.srcAttr._1, triplet.dstAttr._2))) } else if (triplet.srcAttr._1 + 1 >= triplet.dstAttr._1) { val minNode: Long = math.min(math.min(triplet.srcAttr._2, triplet.srcId), triplet.dstAttr._2) Iterator((triplet.dstId, (triplet.srcAttr._1 + 1, minNode))) } else { Iterator.empty } }, (a, b) => (math.max(a._1, b._1), math.min(a._2, b._2)) ) lengthgraph.cache() val subgraphs = lengthgraph.vertices.filter(v => v._2._1 > 1).map(v => v._2._2).distinct().collect() lengthgraph.subgraph(vpred = (id, attr) => subgraphs.contains(attr._2)).triplets.filter(triplet => triplet.srcAttr._1 + 1 == triplet.dstAttr._1).groupBy(triplet => triplet.dstAttr._2).map( group => { val sb = new StringBuilder val sorted = group._2.toArray.sortBy(triplet => triplet.srcAttr._1) sorted.map(triplet => triplet.srcAttr._1 + " " + triplet.dstAttr._1 + " " + triplet.srcAttr._2 + " " + triplet.dstAttr._2 + " " + id2ip(triplet.srcId) + " " + id2ip(triplet.dstId) + " " + triplet.attr).addString(sb, "cluster:\n ", "\n ", "\n") sb } ).collect.foreach(println(_)) //lengthgraph.vertices.map(v => (v._2, id2ip(v._1))).sortByKey(false).take(15).foreach(a => println(s"${a._1} ${a._2}")) } }