Commit 00a762a0 authored by yorn's avatar yorn

Changed loop-prevention

Instead of blaming the last incoming connection, blame every earlier
connection exactly once, by removing all later occurrences.
parent bb974f7a
package no.uninett.yorn.giraph.computation;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import no.uninett.yorn.giraph.format.io.NetflowCSVEdgeInputFormat;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.MutableEdge;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.IntWritable;
......@@ -22,24 +25,11 @@ public class SpreadRank extends
private static final long MASK_TIME = 0x000000FFFFFFFFFFFFL;
private static final long STEP_WEIGHT = 0x0001000000000000L;
static Edge<LongWritable, LongWritable> getLastEdgeBefore(long time,
Iterable<Edge<LongWritable, LongWritable>> edges) {
long highest = Long.MIN_VALUE;
Edge<LongWritable, LongWritable> edge = null;
for (Edge<LongWritable, LongWritable> e : edges) {
long candidate = e.getValue().get();
if (candidate < time && candidate > highest) {
highest = candidate;
edge = e;
}
}
return edge;
}
@Override
public void compute(Vertex<LongWritable, IntWritable, LongWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
if (this.getSuperstep() == 0) {
thinOut(vertex.getMutableEdges());
for (Edge<LongWritable, LongWritable> e : vertex.getEdges()) {
sendMessage(e.getTargetVertexId(), new LongWritable(e
.getValue().get()));
......@@ -51,17 +41,42 @@ public class SpreadRank extends
long messageWeight = (l + STEP_WEIGHT) & MASK_WEIGHT;
long messageTime = l & MASK_TIME;
int value = (int) (messageWeight >> 48);
sendMessage(getLastEdgeBefore(messageTime, vertex.getEdges())
.getTargetVertexId(), new LongWritable(messageWeight));
if (value > currentValue) {
currentValue = value;
}
for (Edge<LongWritable, LongWritable> e : vertex.getEdges()) {
if (e.getValue().get() < messageTime) {
sendMessage(e.getTargetVertexId(), new LongWritable(
messageWeight | e.getValue().get()));
}
}
}
vertex.setValue(new IntWritable(currentValue));
vertex.voteToHalt();
}
}
static void thinOut(Iterable<MutableEdge<LongWritable, LongWritable>> edges) {
HashMap<Long, Long> observed = new HashMap<Long, Long>();
Long theirs;
Long ours;
for (MutableEdge<LongWritable, LongWritable> e : edges) {
theirs = e.getValue().get();
ours = observed.get(e.getTargetVertexId().get());
if (ours == null || theirs < ours) {
observed.put(Long.valueOf(e.getTargetVertexId().get()),
Long.valueOf(e.getValue().get()));
}
}
MutableEdge<LongWritable, LongWritable> current;
Iterator<MutableEdge<LongWritable, LongWritable>> i = edges.iterator();
while (i.hasNext()) {
current = i.next();
if (!Long.valueOf(current.getValue().get()).equals(
observed.get(current.getTargetVertexId().get()))) {
i.remove();
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment