Commit cea592c6 authored by yorn's avatar yorn

Implemented SpreadRank

parent 9e03014a
#!/bin/bash
nfdump -o 'fmt:%ts %te %sa %da %pr %sp %dp %sas %das %in %out %tos %flg' | while read flowstartdate flowstarttime flowenddate flowendtime srcaddr dstaddr proto srcpt dstpt REST
do
if [ "${proto:0:3}" == TCP ]
then
if [ "$srcpt" -lt 1024 -a "$dstpt" -ge 1024 -o "$srcpt" -ge 1024 -a "$dstpt" -lt 1024 ]
then
[ $srcpt -lt $dstpt ] && echo $flowstartdate\ $flowstarttime,$srcaddr,$dstaddr,$srcpt || echo $flowstartdate\ $flowstarttime,$dstaddr,$srcaddr,$dstpt
fi
fi
if [ "${proto:0:3}" == UDP ]
then
if [ "$srcpt" == 19 -o "$srcpt" == 53 -o "$srcpt" == 123 ]
then
echo $flowstartdate\ $flowstarttime,$srcaddr,$dstaddr,$srcpt
elif [ "$dstpt" == 19 -o "$dstpt" == 53 -o "$dstpt" == 123 ]
then
echo $flowstartdate\ $flowstarttime,$dstaddr,$srcaddr,$dstpt
fi
fi
done
package no.uninett.yorn.giraph.computation;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class SpreadRank extends
BasicComputation<IntWritable, IntWritable, LongWritable, LongWritable> {
private static final long MASK_WEIGHT = 0xFFFF000000000000L;
private static final long MASK_TIME = 0x0000FFFFFFFFFFFFL;
private static final long STEP_WEIGHT = 0x0001000000000000L;
private static final Charset ASCII = Charset.forName("ASCII");
private static final DateFormat DATE_FORMAT = new SimpleDateFormat(
"YYYY-MM-dd hh:mm:ss.SSS");
static int ipToInt(String ip) {
String[] octetStrings = ip.trim().split("\\.");
byte[] octets = new byte[4];
for (int i = 0; i < 4; i++) {
octets[i] = (byte) Integer.parseInt(octetStrings[i]);
}
return octets[0] << 24 | octets[1] << 16 | octets[2] << 8 | octets[3];
}
@Override
public void compute(Vertex<IntWritable, IntWritable, LongWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
if (this.getSuperstep() == 0) {
for (Edge<IntWritable, LongWritable> e : vertex.getEdges()) {
sendMessage(e.getTargetVertexId(), new LongWritable(e
.getValue().get()));
}
} else {
int currentValue = vertex.getValue().get();
for (LongWritable m : messages) {
long l = m.get();
long messageWeight = (l + STEP_WEIGHT) & MASK_WEIGHT;
long messageTime = l & MASK_TIME;
int value = (int) (messageWeight >> 48);
for (Edge<IntWritable, LongWritable> e : vertex.getEdges()) {
if (e.getValue().get() > messageTime) {
sendMessage(e.getTargetVertexId(), new LongWritable(messageWeight));
break;
}
}
if (value > currentValue) {
currentValue = value;
}
}
vertex.setValue(new IntWritable(currentValue));
vertex.voteToHalt();
}
}
public class NetflowCSVEdgeInputFormat extends
TextEdgeInputFormat<LongWritable, LongWritable> {
public class NetflowEdgeReader extends
TextEdgeReaderFromEachLineProcessed<String[]> {
public static final int DATE = 0;
public static final int SOURCE_IP = 1;
public static final int DEST_IP = 2;
public static final int PORT = 3;
/** {@inheritDoc} */
@Override
protected String[] preprocessLine(Text line) throws IOException {
return new String(line.getBytes(), ASCII).split(",");
}
/** {@inheritDoc} */
@Override
protected LongWritable getTargetVertexId(String[] line)
throws IOException {
int port = Integer.parseInt(line[PORT]);
/* We switch around source and destination! */
return new LongWritable(port << 32 | ipToInt(line[SOURCE_IP]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getSourceVertexId(String[] line)
throws IOException {
int port = Integer.parseInt(line[PORT]);
/* We switch around source and destination! */
return new LongWritable(port << 32 | ipToInt(line[DEST_IP]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getValue(String[] line) throws IOException {
try {
Date terminationDate = DATE_FORMAT.parse(line[DATE]);
return new LongWritable(terminationDate.getTime() & MASK_TIME);
} catch (Exception e) {
throw new IOException(e);
}
}
}
public EdgeReader<LongWritable, LongWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context)
throws IOException {
return new NetflowEdgeReader();
}
}
}
package no.uninett.yorn.giraph.io.formats;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class NetflowCSVEdgeInputFormat extends TextEdgeInputFormat<LongWritable,LongWritable> {
private static final Charset ASCII = Charset.forName("ASCII");
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("YYYY-MM-dd hh:mm:ss.SSS");
private static enum Protocol { TCP(0), UDP(1), GRE(2), ESP(3), ICMP(4), IPv6(5), PIM(6);
public final byte id;
Protocol(int id) {this.id = (byte) id;}}
private static long ipToLong(String ip) {
String[] octetStrings = ip.trim().split("\\.");
byte[] octets = new byte[4];
for(int i=0;i<4;i++) {
octets[i] = (byte) Integer.parseInt(octetStrings[i]);
}
return octets[0] << 24 | octets[1] << 16 | octets[2] << 8 | octets[3];
}
public class NetflowEdgeReader extends
TextEdgeReaderFromEachLineProcessed<String[]> {
/** {@inheritDoc} */
@Override
protected String[] preprocessLine(Text line) throws IOException {
return new String(line.getBytes(), ASCII).split(",");
}
/** {@inheritDoc} */
@Override
protected LongWritable getTargetVertexId(String[] line)
throws IOException {
return new LongWritable(ipToLong(line[2]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getSourceVertexId(String[] line)
throws IOException {
return new LongWritable(ipToLong(line[3]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getValue(String[] line) throws IOException {
try {
Date terminationDate = DATE_FORMAT.parse(line[1]);
int srcPort = Integer.parseInt(line[5]);
int dstPort = Integer.parseInt(line[6]);
Protocol protocol = Protocol.valueOf(Protocol.class, line[4]);
return new LongWritable(
srcPort << 48
| dstPort << 32
| (7 & protocol.id) << 29
| (0x1FFFL & terminationDate.getTime()));
} catch (Exception e) {
throw new IOException(e);
}
}}
@Override
public EdgeReader<LongWritable, LongWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new NetflowEdgeReader();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment