Commit 2f439030 authored by yorn's avatar yorn

Final changes for thesis

parent 46e243da
# IP Ranking algorithms for NetFlow traffic
This project contains Giraph jobs to find IP addresses from NetFlow data that may be spreading harmful traffic.
This project contains Giraph jobs to find IP addresses from NetFlow data that are spreading traffic.
NetFlow data is converted and filtered to a CSV file using `FlowConvert.sh`. It can then be imported using `NetflowCSVEdgeInputFormat` and run through `DOSRank` and `SpreadRank`.
\ No newline at end of file
NetFlow data is converted and filtered to a CSV file using `FlowConvert.sh`. It can then be imported using `NetflowCSVEdgeInputFormat` and run through `DOSRank` and `SpreadRank`.
#!/bin/bash
nfdump -o 'fmt:%ts %te %sa %da %pr %sp %dp %sas %das %in %out %tos %flg' | while read flowstartdate flowstarttime flowenddate flowendtime srcaddr dstaddr proto srcpt dstpt REST
nfdump "$@" -o 'fmt:%ts %te %sa %da %pr %sp %dp %sas %das %in %out %tos %flg' | while read flowstartdate flowstarttime flowenddate flowendtime srcaddr dstaddr proto srcpt dstpt REST
do
if [ "${proto:0:3}" == TCP ]
if [ "${proto}" = "TCP" -o "${proto}" = "UDP" ]
then
if [ "$srcpt" -lt 1024 -a "$dstpt" -ge 1024 -o "$srcpt" -ge 1024 -a "$dstpt" -lt 1024 ]
then
[ $srcpt -lt $dstpt ] && echo $flowstartdate\ $flowstarttime,$srcaddr,$dstaddr,$srcpt || echo $flowstartdate\ $flowstarttime,$dstaddr,$srcaddr,$dstpt
fi
fi
if [ "${proto:0:3}" == UDP ]
then
if [ "$srcpt" == 19 -o "$srcpt" == 53 -o "$srcpt" == 123 ]
then
echo $flowstartdate\ $flowstarttime,$srcaddr,$dstaddr,$srcpt
elif [ "$dstpt" == 19 -o "$dstpt" == 53 -o "$dstpt" == 123 ]
then
echo $flowstartdate\ $flowstarttime,$dstaddr,$srcaddr,$dstpt
[ $srcpt -lt $dstpt ] && echo $flowstartdate\ $flowstarttime,$dstaddr,$srcaddr,$srcpt || echo $flowstartdate\ $flowstarttime,$srcaddr,$dstaddr,$dstpt
fi
fi
done
......@@ -3,7 +3,6 @@ package no.uninett.yorn.giraph.computation;
import java.io.IOException;
import java.util.Collection;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.IntWritable;
......@@ -32,10 +31,7 @@ public class ReverseDOSRank extends
public void compute(Vertex<LongWritable, IntWritable, LongWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
if (this.getSuperstep() == 0) {
//sendMessageToAllEdges(vertex, NULL_WRITABLE);
for (Edge<LongWritable, LongWritable> e : vertex.getEdges()) {
sendMessage(e.getTargetVertexId(), NULL_WRITABLE);
}
sendMessageToAllEdges(vertex, NULL_WRITABLE);
} else {
int i;
if (messages instanceof Collection) {
......
......@@ -10,7 +10,6 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.MutableEdge;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
/**
......@@ -18,44 +17,40 @@ import org.apache.hadoop.io.LongWritable;
* generate spreads in the network. It uses input data from
* {@link NetflowCSVEdgeInputFormat}.
*/
public class SpreadRank extends
BasicComputation<LongWritable, IntWritable, LongWritable, LongWritable> {
private static final long MASK_WEIGHT = 0x7FFF000000000000L;
private static final long MASK_TIME = 0x000000FFFFFFFFFFFFL;
private static final long STEP_WEIGHT = 0x0001000000000000L;
public class SpreadRank
extends
BasicComputation<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
public void compute(Vertex<LongWritable, IntWritable, LongWritable> vertex,
public void compute(
Vertex<LongWritable, LongWritable, LongWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
if (this.getSuperstep() == 0) {
if (getSuperstep() == 0) {
thinOut(vertex.getMutableEdges());
for (Edge<LongWritable, LongWritable> e : vertex.getEdges()) {
sendMessage(e.getTargetVertexId(), new LongWritable(e
.getValue().get()));
}
} else if (this.getSuperstep() == 64) {
vertex.voteToHalt();
} else {
int currentValue = vertex.getValue().get();
vertex.setValue(new LongWritable(vertex.getNumEdges()));
} else if (getSuperstep() < 200) {
long currentValue = vertex.getValue().get() & 0x00FFFFFFFFFFFFFFL;
long messageTime = 0;
for (LongWritable m : messages) {
long l = m.get();
long messageWeight = (l + STEP_WEIGHT) & MASK_WEIGHT;
long messageTime = l & MASK_TIME;
int value = (int) (messageWeight >> 48);
if (value > currentValue) {
currentValue = value;
currentValue++;
if (messageTime < m.get()) {
messageTime = m.get();
}
for (Edge<LongWritable, LongWritable> e : vertex.getEdges()) {
if (e.getValue().get() < messageTime) {
sendMessage(e.getTargetVertexId(), new LongWritable(
messageWeight | e.getValue().get()));
}
}
for (Edge<LongWritable, LongWritable> e : vertex.getEdges()) {
if (e.getValue().get() < messageTime) {
sendMessage(e.getTargetVertexId(), new LongWritable(e
.getValue().get()));
}
}
vertex.setValue(new IntWritable(currentValue));
vertex.voteToHalt();
vertex.setValue(new LongWritable(getSuperstep() << 56
| currentValue));
}
vertex.voteToHalt();
}
static void thinOut(Iterable<MutableEdge<LongWritable, LongWritable>> edges) {
......
......@@ -24,8 +24,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class NetflowCSVEdgeInputFormat extends
TextEdgeInputFormat<LongWritable, LongWritable> {
private static final long MASK_TIME = 0x0000FFFFFFFFFFFFL;
private static final DateFormat DATE_FORMAT = new SimpleDateFormat(
"YYYY-MM-dd hh:mm:ss.SSS");
......@@ -76,7 +74,7 @@ public class NetflowCSVEdgeInputFormat extends
protected LongWritable getValue(String[] line) throws IOException {
try {
Date date = DATE_FORMAT.parse(line[DATE]);
return new LongWritable(date.getTime() & MASK_TIME);
return new LongWritable(date.getTime());
} catch (ParseException e) {
throw new IOException(e);
}
......
......@@ -4,12 +4,11 @@ import java.io.IOException;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class RankVertexOutputFormat extends TextVertexOutputFormat<LongWritable, IntWritable, LongWritable> {
public class RankVertexOutputFormat extends TextVertexOutputFormat<LongWritable, LongWritable, LongWritable> {
static String intToIp(int ip) {
StringBuilder octetStrings = new StringBuilder();
......@@ -27,18 +26,20 @@ public class RankVertexOutputFormat extends TextVertexOutputFormat<LongWritable,
@Override
protected Text convertVertexToLine(
Vertex<LongWritable, IntWritable, LongWritable> vertex)
Vertex<LongWritable, LongWritable, LongWritable> vertex)
throws IOException {
long vertexId = vertex.getId().get();
String ip = intToIp((int) vertexId);
long port = vertexId >> 32;
return new Text(ip+':'+port+'\t'+vertex.getValue().get());
int port = (int) (vertexId >> 32);
long spreading = (int) vertex.getValue().get() & 0x00FFFFFFFFFFFFFFL;
int depth = (int) (vertex.getValue().get() >> 56);
return new Text(ip+':'+port+'\t'+vertex.getNumEdges()+'\t'+spreading+'\t'+depth);
}
}
@Override
public org.apache.giraph.io.formats.TextVertexOutputFormat<LongWritable, IntWritable, LongWritable>.TextVertexWriterToEachLine createVertexWriter(
public org.apache.giraph.io.formats.TextVertexOutputFormat<LongWritable, LongWritable, LongWritable>.TextVertexWriterToEachLine createVertexWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
return new RankVertexWriter();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment