Commit 09a2842b authored by Sigmund Augdal's avatar Sigmund Augdal

Work in progress graph app test

parent 475a6402
package no.uninett
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.SparkContext._
/**
* Hello world!
*
*/
object GraphApp
{
val DestPort = 6
val SrcIP = 3
val DestIP = 4
def main(args: Array[String]):Unit={
val appName = "scala test"
val conf = new SparkConf().setAppName(appName).set("spark.mesos.coarse","true")
val sc = new org.apache.spark.SparkContext(conf)
val lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.csv")
val csv = lines.map(x => x.split(",").map(x => x.trim))
val tuples = csv.map(x => (x(SrcIP), x(DestIP), x(DestPort)))
val v4only = tuples.filter(r => !r._1.contains(":") && !r._2.contains(":") && !r._3.contains("."))
val ip2id = (ip:String) => ip.split("\\.").map(c => c.toLong).reduceLeft((a,b) => a * 256 + b)
val id2ip = (id:Long) => s"${id>>24}.${(id>>16)&0xff}.${(id>>8)&0xff}.${id&0xff}"
val edges = v4only.map(r=>new Edge(ip2id(r._1), ip2id(r._2), r._3.toInt)).distinct()
val graph = Graph.fromEdges(edges, (0, Long.MaxValue))
val httpgraph = graph.subgraph(edge => edge.attr == 53)
val lengthgraph = httpgraph.pregel((0, Long.MaxValue), 10)(
(id, attr, newAttr) => (math.max(newAttr._1, attr._1), math.min(newAttr._2, attr._2)),
triplet => {
if (triplet.srcAttr._2 == triplet.dstAttr._2 && triplet.srcAttr._2 < Long.MaxValue) { // break cycles
Iterator.empty
}
else if (triplet.srcAttr._1 + 1 <= triplet.dstAttr._1 && triplet.srcAttr._2 > triplet.dstAttr._2) {
Iterator((triplet.srcId, (triplet.srcAttr._1, triplet.dstAttr._2)))
}
else if (triplet.srcAttr._1 + 1 >= triplet.dstAttr._1) {
val minNode: Long = math.min(math.min(triplet.srcAttr._2, triplet.srcId), triplet.dstAttr._2)
Iterator((triplet.dstId, (triplet.srcAttr._1 + 1, minNode)))
} else {
Iterator.empty
}
},
(a, b) => (math.max(a._1, b._1), math.min(a._2, b._2))
)
lengthgraph.cache()
val subgraphs = lengthgraph.vertices.filter(v => v._2._1 > 1).map(v => v._2._2).distinct().collect()
lengthgraph.subgraph(vpred = (id, attr) => subgraphs.contains(attr._2)).triplets.filter(triplet => triplet.srcAttr._1 + 1 == triplet.dstAttr._1).groupBy(triplet => triplet.dstAttr._2).map(
group => {
val sb = new StringBuilder
val sorted = group._2.toArray.sortBy(triplet => triplet.srcAttr._1)
sorted.map(triplet => triplet.srcAttr._1 + " " + triplet.dstAttr._1 + " " + triplet.srcAttr._2 + " " + triplet.dstAttr._2 + " " + id2ip(triplet.srcId) + " " + id2ip(triplet.dstId) + " " + triplet.attr).addString(sb, "cluster:\n ", "\n ", "\n")
sb
}
).collect.foreach(println(_))
//lengthgraph.vertices.map(v => (v._2, id2ip(v._1))).sortByKey(false).take(15).foreach(a => println(s"${a._1} ${a._2}"))
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment