package no.uninett import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.SparkContext._ /** * Hello world! * */ object GraphApp { val DestPort = 6 val SrcIP = 3 val DestIP = 4 def main(args: Array[String]):Unit={ val appName = "scala test" val conf = new SparkConf().setAppName(appName).set("spark.mesos.coarse","true") val sc = new org.apache.spark.SparkContext(conf) val lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.csv") val csv = lines.map(x => x.split(",").map(x => x.trim)) val tuples = csv.map(x => (x(SrcIP), x(DestIP), x(DestPort))) val v4only = tuples.filter(r => !r._1.contains(":") && !r._2.contains(":") && !r._3.contains(".")) val ip2id = (ip:String) => ip.split("\\.").map(c => c.toLong).reduceLeft((a,b) => a * 256 + b) val id2ip = (id:Long) => s"${id>>24}.${(id>>16)&0xff}.${(id>>8)&0xff}.${id&0xff}" val edges = v4only.map(r=>new Edge(ip2id(r._1), ip2id(r._2), r._3.toInt)).distinct() val graph = Graph.fromEdges(edges, Array[Long]()) val httpgraph = graph.subgraph(edge => edge.attr == 179) val lengthgraph = httpgraph.pregel(Array[Long](), 75)( (id, attr, newAttr) => newAttr :+ id, triplet => { if (triplet.srcAttr.length >= triplet.dstAttr.length && !triplet.srcAttr.contains(triplet.dstId)) { Iterator((triplet.dstId, triplet.srcAttr)) } else { Iterator.empty } }, (a, b) => { if (a.length > b.length) { a } else if (b.length > a.length){ b } else if (a(0) < b(0)) { a } else { b } } ) lengthgraph.cache() val candidates = lengthgraph.vertices.filter(v => v._2.length > 2) val results = candidates.map(v => (v._2(0), v._2)).reduceByKey((a, b) => { if (a.length > b.length) { a } else { b } }).map(a=>a._2) results.map( path => { val sb = new StringBuilder path.map(id => id2ip(id)).addString(sb, "Chain length " + path.length + ": ", " -> ", "") sb } ).collect.foreach(println(_)) } }