Commit 7566712b authored by Gurvinder Singh's avatar Gurvinder Singh

added missing java code

parent d91267bf
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>Akka repository</id>
<url>http://repo.akka.io/releases</url>
</repository>
</repositories>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<compilerArgument></compilerArgument>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.List;
class Split implements Function<String, String[]> {
public String[] call(String s) {
return s.split(",");
}
}
class MKTuple implements PairFunction<String, String, Integer> {
public Tuple2<String, Integer> call(String x) {
return new Tuple2<String, Integer>(x, 1);
}
}
class Add implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) {
return a + b;
}
}
class Swap implements PairFunction<Tuple2<String, Integer>, Integer, String> {
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
return new Tuple2<Integer, String>(in._2, in._1);
}
}
class GetPort implements Function<String[], String> {
public String call(String[] a) {
return a[6];
}
}
public class SimpleApp {
public static void main(String[] args) {
String logFile = "hdfs://daas/daas_flows/trd-gw-2014-05-03.csv";
JavaSparkContext sc = new JavaSparkContext();
JavaRDD<String> logData = sc.textFile(logFile);
JavaRDD<String[]> csv = logData.map(new Split());
topPorts(csv);
}
public static void topPorts(JavaRDD<String[]> csv) {
JavaRDD<String> ports = csv.map(new GetPort());
JavaPairRDD<String, Integer> pairs = ports.mapToPair(new MKTuple());
JavaPairRDD<String, Integer> port_count = pairs.reduceByKey(new Add());
JavaPairRDD<Integer, String> invpairs = port_count.mapToPair(new Swap());
JavaPairRDD<Integer, String> sorted = invpairs.sortByKey(false);
List<Tuple2<Integer, String>> tops = sorted.take(15);
for (int i = 0; i < 15; ++i) {
System.out.println(String.format(" %s %s", tops.get(i)._1, tops.get(i)._2));
}
}
}
#Generated by Maven
#Thu May 14 19:59:09 CEST 2015
version=1.0
groupId=edu.berkeley
artifactId=simple-project
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment