Commit 20a41c71 authored by Sigmund Augdal's avatar Sigmund Augdal

Checking in graph app version that actually works

parent e6043a65
......@@ -24,36 +24,38 @@ object GraphApp
val ip2id = (ip:String) => ip.split("\\.").map(c => c.toLong).reduceLeft((a,b) => a * 256 + b)
val id2ip = (id:Long) => s"${id>>24}.${(id>>16)&0xff}.${(id>>8)&0xff}.${id&0xff}"
val edges = v4only.map(r=>new Edge(ip2id(r._1), ip2id(r._2), r._3.toInt)).distinct()
val graph = Graph.fromEdges(edges, (0, Long.MaxValue))
val httpgraph = graph.subgraph(edge => edge.attr == 53)
val lengthgraph = httpgraph.pregel((0, Long.MaxValue), 10)(
(id, attr, newAttr) => (math.max(newAttr._1, attr._1), math.min(newAttr._2, attr._2)),
val graph = Graph.fromEdges(edges, Array[Long]())
val httpgraph = graph.subgraph(edge => edge.attr == 179)
val lengthgraph = httpgraph.pregel(Array[Long](), 75)(
(id, attr, newAttr) => newAttr :+ id,
triplet => {
if (triplet.srcAttr._2 == triplet.dstAttr._2 && triplet.srcAttr._2 < Long.MaxValue) { // break cycles
Iterator.empty
}
else if (triplet.srcAttr._1 + 1 <= triplet.dstAttr._1 && triplet.srcAttr._2 > triplet.dstAttr._2) {
Iterator((triplet.srcId, (triplet.srcAttr._1, triplet.dstAttr._2)))
}
else if (triplet.srcAttr._1 + 1 >= triplet.dstAttr._1) {
val minNode: Long = math.min(math.min(triplet.srcAttr._2, triplet.srcId), triplet.dstAttr._2)
Iterator((triplet.dstId, (triplet.srcAttr._1 + 1, minNode)))
if (triplet.srcAttr.length >= triplet.dstAttr.length && !triplet.srcAttr.contains(triplet.dstId)) {
Iterator((triplet.dstId, triplet.srcAttr))
} else {
Iterator.empty
}
},
(a, b) => (math.max(a._1, b._1), math.min(a._2, b._2))
(a, b) => {
if (a.length > b.length) {
a
} else if (b.length > a.length){
b
} else if (a(0) < b(0)) {
a
} else {
b
}
}
)
lengthgraph.cache()
val subgraphs = lengthgraph.vertices.filter(v => v._2._1 > 1).map(v => v._2._2).distinct().collect()
lengthgraph.subgraph(vpred = (id, attr) => subgraphs.contains(attr._2)).triplets.filter(triplet => triplet.srcAttr._1 + 1 == triplet.dstAttr._1).groupBy(triplet => triplet.dstAttr._2).map(
group => {
val candidates = lengthgraph.vertices.filter(v => v._2.length > 2)
val results = candidates.map(v => (v._2(0), v._2)).reduceByKey((a, b) => { if (a.length > b.length) { a } else { b } }).map(a=>a._2)
results.map(
path => {
val sb = new StringBuilder
val sorted = group._2.toArray.sortBy(triplet => triplet.srcAttr._1)
sorted.map(triplet => triplet.srcAttr._1 + " " + triplet.dstAttr._1 + " " + triplet.srcAttr._2 + " " + triplet.dstAttr._2 + " " + id2ip(triplet.srcId) + " " + id2ip(triplet.dstId) + " " + triplet.attr).addString(sb, "cluster:\n ", "\n ", "\n")
path.map(id => id2ip(id)).addString(sb, "Chain length " + path.length + ": ", " -> ", "")
sb
}
).collect.foreach(println(_))
//lengthgraph.vertices.map(v => (v._2, id2ip(v._1))).sortByKey(false).take(15).foreach(a => println(s"${a._1} ${a._2}"))
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment