Commit 2ab266c4 authored by yorn's avatar yorn

Implemented DOSRank

parent 08a455ff
package no.uninett.yorn.giraph.computation;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class DOSRank extends
BasicComputation<IntWritable, IntWritable, NullWritable, LongWritable> {
private static final Charset ASCII = Charset.forName("ASCII");
static int ipToInt(String ip) {
String[] octetStrings = ip.trim().split("\\.");
byte[] octets = new byte[4];
for (int i = 0; i < 4; i++) {
octets[i] = (byte) Integer.parseInt(octetStrings[i]);
}
return octets[0] << 24 | octets[1] << 16 | octets[2] << 8 | octets[3];
}
@Override
public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
vertex.setValue(new IntWritable(vertex.getNumEdges()));
vertex.voteToHalt();
}
public class NetflowCSVEdgeInputFormat extends
TextEdgeInputFormat<IntWritable, NullWritable> {
public class NetflowEdgeReader extends
TextEdgeReaderFromEachLineProcessed<String[]> {
public static final int DATE = 0;
public static final int SOURCE_IP = 1;
public static final int DEST_IP = 2;
public static final int PORT = 3;
/** {@inheritDoc} */
@Override
protected String[] preprocessLine(Text line) throws IOException {
return new String(line.getBytes(), ASCII).split(",");
}
/** {@inheritDoc} */
@Override
protected IntWritable getTargetVertexId(String[] line)
throws IOException {
return new IntWritable(ipToInt(line[SOURCE_IP]));
}
/** {@inheritDoc} */
@Override
protected IntWritable getSourceVertexId(String[] line)
throws IOException {
return new IntWritable(ipToInt(line[DEST_IP]));
}
/** {@inheritDoc} */
@Override
protected NullWritable getValue(String[] line) throws IOException {
return NullWritable.get();
}
}
public EdgeReader<IntWritable, NullWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context)
throws IOException {
return new NetflowEdgeReader();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment