Commit 475a6402 authored by Sigmund Augdal's avatar Sigmund Augdal

Implemented the find top ports algorithm in java7. Oh the horror!

parent 81c8c934
......@@ -23,4 +23,19 @@
<version>2.3.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<compilerArgument></compilerArgument>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.List;
class Split implements Function<String, String[]> {
public String[] call(String s) {
return s.split(",");
}
}
class MKTuple implements PairFunction<String, String, Integer> {
public Tuple2<String, Integer> call(String x) {
return new Tuple2<String, Integer>(x, 1);
}
}
class Add implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) {
return a + b;
}
}
class Swap implements PairFunction<Tuple2<String, Integer>, Integer, String> {
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
return new Tuple2<Integer, String>(in._2, in._1);
}
}
class GetPort implements Function<String[], String> {
public String call(String[] a) {
return a[6];
}
}
public class SimpleApp {
public static void main(String[] args) {
String logFile = "spark-env.sh"; // Should be some file on your system
JavaSparkContext sc = new JavaSparkContext("local", "Simple App",
new String[]{"target/simple-project-1.0.jar"});
JavaRDD<String> logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("a"); }
}).count();
long numBs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("b"); }
}).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
}
public static void main(String[] args) {
String logFile = "hdfs://daas/daas_flows/trd-gw-2014-05-03.csv";
JavaSparkContext sc = new JavaSparkContext();
JavaRDD<String> logData = sc.textFile(logFile);
JavaRDD<String[]> csv = logData.map(new Split());
topPorts(csv);
}
public static void topPorts(JavaRDD<String[]> csv) {
JavaRDD<String> ports = csv.map(new GetPort());
JavaPairRDD<String, Integer> pairs = ports.mapToPair(new MKTuple());
JavaPairRDD<String, Integer> port_count = pairs.reduceByKey(new Add());
JavaPairRDD<Integer, String> invpairs = port_count.mapToPair(new Swap());
JavaPairRDD<Integer, String> sorted = invpairs.sortByKey(false);
return sorted.take(num);
List<Tuple2<Integer, String>> tops = sorted.take(15);
for (int i = 0; i < 15; ++i) {
System.out.println(String.format(" %s %s", tops.get(i)._1, tops.get(i)._2));
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment