Commit 9e03014a authored by yorn's avatar yorn

Initial commit

parents
bin
target
# NetFlow Giraph job
This project contains code for reading CSV files that have been converted from NetFlow.
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>no.uninett.yorn.giraph</groupId>
<artifactId>core</artifactId>
<version>Git-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source/>
<target/>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.giraph</groupId>
<artifactId>giraph-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>javadoc</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package no.uninett.yorn.giraph.io.formats;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class NetflowCSVEdgeInputFormat extends TextEdgeInputFormat<LongWritable,LongWritable> {
private static final Charset ASCII = Charset.forName("ASCII");
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("YYYY-MM-dd hh:mm:ss.SSS");
private static enum Protocol { TCP(0), UDP(1), GRE(2), ESP(3), ICMP(4), IPv6(5), PIM(6);
public final byte id;
Protocol(int id) {this.id = (byte) id;}}
private static long ipToLong(String ip) {
String[] octetStrings = ip.trim().split("\\.");
byte[] octets = new byte[4];
for(int i=0;i<4;i++) {
octets[i] = (byte) Integer.parseInt(octetStrings[i]);
}
return octets[0] << 24 | octets[1] << 16 | octets[2] << 8 | octets[3];
}
public class NetflowEdgeReader extends
TextEdgeReaderFromEachLineProcessed<String[]> {
/** {@inheritDoc} */
@Override
protected String[] preprocessLine(Text line) throws IOException {
return new String(line.getBytes(), ASCII).split(",");
}
/** {@inheritDoc} */
@Override
protected LongWritable getTargetVertexId(String[] line)
throws IOException {
return new LongWritable(ipToLong(line[2]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getSourceVertexId(String[] line)
throws IOException {
return new LongWritable(ipToLong(line[3]));
}
/** {@inheritDoc} */
@Override
protected LongWritable getValue(String[] line) throws IOException {
try {
Date terminationDate = DATE_FORMAT.parse(line[1]);
int srcPort = Integer.parseInt(line[5]);
int dstPort = Integer.parseInt(line[6]);
Protocol protocol = Protocol.valueOf(Protocol.class, line[4]);
return new LongWritable(
srcPort << 48
| dstPort << 32
| (7 & protocol.id) << 29
| (0x1FFFL & terminationDate.getTime()));
} catch (Exception e) {
throw new IOException(e);
}
}}
@Override
public EdgeReader<LongWritable, LongWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new NetflowEdgeReader();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment