Commit a7c4f13d authored by yorn's avatar yorn

Various fixes

parent 5571bd33
package no.uninett.yorn.giraph.computation;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* This computation will rank nodes on the amount of outgoing flows.
* A higher amount of flows may indicate a (D)DoS attack,
* but it may also be a very active host.
*/
public class DOSRank extends
BasicComputation<IntWritable, IntWritable, NullWritable, LongWritable> {
BasicComputation<LongWritable, IntWritable, LongWritable, LongWritable> {
/*
* E is LongWritable for compatibility with SpreadRank, but edge value is
* not used
*/
private static final Charset ASCII = Charset.forName("ASCII");
static int ipToInt(String ip) {
String[] octetStrings = ip.trim().split("\\.");
byte[] octets = new byte[4];
for (int i = 0; i < 4; i++) {
octets[i] = (byte) Integer.parseInt(octetStrings[i]);
}
return octets[0] << 24 | octets[1] << 16 | octets[2] << 8 | octets[3];
}
static final LongWritable NULL_WRITABLE = new LongWritable(0);
@Override
public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
public void compute(Vertex<LongWritable, IntWritable, LongWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
vertex.setValue(new IntWritable(vertex.getNumEdges()));
vertex.voteToHalt();
}
public class NetflowCSVEdgeInputFormat extends
TextEdgeInputFormat<IntWritable, NullWritable> {
public class NetflowEdgeReader extends
TextEdgeReaderFromEachLineProcessed<String[]> {
public static final int DATE = 0;
public static final int SOURCE_IP = 1;
public static final int DEST_IP = 2;
public static final int PORT = 3;
/** {@inheritDoc} */
@Override
protected String[] preprocessLine(Text line) throws IOException {
return new String(line.getBytes(), ASCII).split(",");
if (this.getSuperstep() == 0) {
this.sendMessageToAllEdges(vertex, NULL_WRITABLE);
} else {
int i;
if (messages instanceof Collection) {
i = ((Collection<?>) messages).size();
} else {
i = 0;
for (@SuppressWarnings("unused")
Object unused : messages) {
i++;
}
}
/** {@inheritDoc} */
@Override
protected IntWritable getTargetVertexId(String[] line)
throws IOException {
return new IntWritable(ipToInt(line[SOURCE_IP]));
}
/** {@inheritDoc} */
@Override
protected IntWritable getSourceVertexId(String[] line)
throws IOException {
return new IntWritable(ipToInt(line[DEST_IP]));
}
/** {@inheritDoc} */
@Override
protected NullWritable getValue(String[] line) throws IOException {
return NullWritable.get();
}
}
public EdgeReader<IntWritable, NullWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context)
throws IOException {
return new NetflowEdgeReader();
vertex.setValue(new IntWritable(i));
vertex.voteToHalt();
}
}
}
package no.uninett.yorn.giraph.computation;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import no.uninett.yorn.giraph.format.io.NetflowCSVEdgeInputFormat;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* Algorithm to rank IP addresses/port number pairs on how far traffic they
* generate spreads in the network. It uses input data from
* {@link NetflowCSVEdgeInputFormat}.
*/
public class SpreadRank extends
BasicComputation<IntWritable, IntWritable, LongWritable, LongWritable> {
BasicComputation<LongWritable, IntWritable, LongWritable, LongWritable> {
private static final long MASK_WEIGHT = 0xFFFF000000000000L;
private static final long MASK_TIME = 0x0000FFFFFFFFFFFFL;
private static final long MASK_TIME = 0x000000FFFFFFFFFFFFL;
private static final long STEP_WEIGHT = 0x0001000000000000L;
private static final Charset ASCII = Charset.forName("ASCII");
private static final DateFormat DATE_FORMAT = new SimpleDateFormat(
"YYYY-MM-dd hh:mm:ss.SSS");
static int ipToInt(String ip) {
String[] octetStrings = ip.trim().split("\\.");
byte[] octets = new byte[4];
for (int i = 0; i < 4; i++) {
octets[i] = (byte) Integer.parseInt(octetStrings[i]);
}
return octets[0] << 24 | octets[1] << 16 | octets[2] << 8 | octets[3];
}
static Edge<IntWritable, LongWritable> getLastEdgeBefore(long time,
Iterable<Edge<IntWritable, LongWritable>> edges) {
static Edge<LongWritable, LongWritable> getLastEdgeBefore(long time,
Iterable<Edge<LongWritable, LongWritable>> edges) {
long highest = Long.MIN_VALUE;
Edge<IntWritable, LongWritable> edge = null;
for (Edge<IntWritable, LongWritable> e : edges) {
Edge<LongWritable, LongWritable> edge = null;
for (Edge<LongWritable, LongWritable> e : edges) {
long candidate = e.getValue().get();
if (candidate < time && candidate > highest) {
highest = candidate;
......@@ -52,10 +37,10 @@ public class SpreadRank extends
}
@Override
public void compute(Vertex<IntWritable, IntWritable, LongWritable> vertex,
public void compute(Vertex<LongWritable, IntWritable, LongWritable> vertex,
Iterable<LongWritable> messages) throws IOException {
if (this.getSuperstep() == 0) {
for (Edge<IntWritable, LongWritable> e : vertex.getEdges()) {
for (Edge<LongWritable, LongWritable> e : vertex.getEdges()) {
sendMessage(e.getTargetVertexId(), new LongWritable(e
.getValue().get()));
}
......@@ -79,59 +64,4 @@ public class SpreadRank extends
}
public class NetflowCSVEdgeInputFormat extends
TextEdgeInputFormat<LongWritable, LongWritable> {
public class NetflowEdgeReader extends
TextEdgeReaderFromEachLineProcessed<String[]> {
public static final int DATE = 0;
public static final int SOURCE_IP = 1;
public static final int DEST_IP = 2;
public static final int PORT = 3;
/** {@inheritDoc} */
@Override
protected String[] preprocessLine(Text line) throws IOException {
return new String(line.getBytes(), ASCII).split(",");
}
/** {@inheritDoc} */
@Override
protected LongWritable getTargetVertexId(String[] line)
throws IOException {
int port = Integer.parseInt(line[PORT]);
/* We switch around source and destination! */
return new LongWritable(port << 32 | ipToInt(line[SOURCE_IP]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getSourceVertexId(String[] line)
throws IOException {
int port = Integer.parseInt(line[PORT]);
/* We switch around source and destination! */
return new LongWritable(port << 32 | ipToInt(line[DEST_IP]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getValue(String[] line) throws IOException {
try {
Date terminationDate = DATE_FORMAT.parse(line[DATE]);
return new LongWritable(terminationDate.getTime()
& MASK_TIME);
} catch (Exception e) {
throw new IOException(e);
}
}
}
public EdgeReader<LongWritable, LongWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context)
throws IOException {
return new NetflowEdgeReader();
}
}
}
package no.uninett.yorn.giraph.format.io;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* Input format for NetFlow data converted using the FlowConvert.sh command. It
* will switch around the source and destination IPs, because of the way
* SpreadRank works.
*
* SpreadRank works on the premise that Giraph messages are sent in the opposite
* direction from the direction of the flows.
*/
public class NetflowCSVEdgeInputFormat extends
TextEdgeInputFormat<LongWritable, LongWritable> {
private static final long MASK_TIME = 0x0000FFFFFFFFFFFFL;
private static final Charset ASCII = Charset.forName("ASCII");
private static final DateFormat DATE_FORMAT = new SimpleDateFormat(
"YYYY-MM-dd hh:mm:ss.SSS");
static int ipToInt(String ip) {
String[] octetStrings = ip.trim().split("\\.");
byte[] octets = new byte[4];
for (int i = 0; i < 4; i++) {
octets[i] = (byte) Integer.parseInt(octetStrings[i]);
}
return octets[0] << 24 | octets[1] << 16 | octets[2] << 8 | octets[3];
}
public class NetflowEdgeReader extends
TextEdgeReaderFromEachLineProcessed<String[]> {
public static final int DATE = 0;
public static final int SOURCE_IP = 1;
public static final int DEST_IP = 2;
public static final int PORT = 3;
/** {@inheritDoc} */
@Override
protected String[] preprocessLine(Text line) throws IOException {
return new String(line.getBytes(), ASCII).split(",");
}
/** {@inheritDoc} */
@Override
protected LongWritable getTargetVertexId(String[] line)
throws IOException {
int port = Integer.parseInt(line[PORT]);
/* We switch around source and destination IP! */
return new LongWritable(port << 32 | ipToInt(line[SOURCE_IP]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getSourceVertexId(String[] line)
throws IOException {
int port = Integer.parseInt(line[PORT]);
/* We switch around source and destination IP! */
return new LongWritable(port << 32 | ipToInt(line[DEST_IP]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getValue(String[] line) throws IOException {
try {
Date terminationDate = DATE_FORMAT.parse(line[DATE]);
return new LongWritable(terminationDate.getTime() & MASK_TIME);
} catch (Exception e) {
throw new IOException(e);
}
}
}
public EdgeReader<LongWritable, LongWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new NetflowEdgeReader();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment