Commit 5b279616 authored by Gurvinder Singh's avatar Gurvinder Singh

added example apps

parent 20a41c71
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>Akka repository</id>
<url>http://repo.akka.io/releases</url>
</repository>
</repositories>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<compilerArgument></compilerArgument>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.List;
class Split implements Function<String, String[]> {
public String[] call(String s) {
return s.split(",");
}
}
class MKTuple implements PairFunction<String, String, Integer> {
public Tuple2<String, Integer> call(String x) {
return new Tuple2<String, Integer>(x, 1);
}
}
class Add implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) {
return a + b;
}
}
class Swap implements PairFunction<Tuple2<String, Integer>, Integer, String> {
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
return new Tuple2<Integer, String>(in._2, in._1);
}
}
class GetPort implements Function<String[], String> {
public String call(String[] a) {
return a[6];
}
}
public class SimpleApp {
public static void main(String[] args) {
String logFile = "hdfs://daas/daas_flows/trd-gw-2014-05-03.csv";
JavaSparkContext sc = new JavaSparkContext();
JavaRDD<String> logData = sc.textFile(logFile);
JavaRDD<String[]> csv = logData.map(new Split());
topPorts(csv);
}
public static void topPorts(JavaRDD<String[]> csv) {
JavaRDD<String> ports = csv.map(new GetPort());
JavaPairRDD<String, Integer> pairs = ports.mapToPair(new MKTuple());
JavaPairRDD<String, Integer> port_count = pairs.reduceByKey(new Add());
JavaPairRDD<Integer, String> invpairs = port_count.mapToPair(new Swap());
JavaPairRDD<Integer, String> sorted = invpairs.sortByKey(false);
return sorted.take(num);
List<Tuple2<Integer, String>> tops = sorted.take(15);
for (int i = 0; i < 15; ++i) {
System.out.println(String.format(" %s %s", tops.get(i)._1, tops.get(i)._2));
}
}
}
......@@ -17,13 +17,11 @@ def top_ips(csv, which="both", num=10):
ip_count = ips.map(lambda x: (x, 1)).reduceByKey(add)
return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)
def top_ports(csv, num=10):
ports = csv.map(lambda x: x[DEST_PORT])
port_count = ports.map(lambda x: (x, 1)).reduceByKey(add)
return port_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)
def ports_count_by_ip3(csv):
ips = csv.map(lambda x: ((x[DEST_PORT], x[SRC_IP], x[DEST_IP]), 1))
ip_count = ips.reduceByKey(add, numPartitions=120)
......
......@@ -8,7 +8,7 @@ DESCRIPTION = "Analyze netflow data"
conf = SparkConf()
#conf.setAppName("Netflow test").set("spark.executor.memory", "12g").set("spark.mesos.coarse","true")
conf.setAppName("Netflow test").set("spark.executor.memory", "12g")
conf.setAppName("Netflow test").set("spark.executor.memory", "12g").set("spark.io.compression.codec","lzf").set("spark.shuffle.spill","false").set("spark.mesos.coarse","true")
sc = SparkContext(conf=conf)
......
......@@ -4,7 +4,7 @@ from pyspark.sql import SQLContext
from pyspark import StorageLevel
conf = SparkConf()
conf.setAppName("SQL test").set("spark.executor.memory", "17g").set("spark.mesos.coarse","true")
conf.setAppName("SQL test").set("spark.executor.memory", "17g").set("spark.mesos.coarse","true").set("spark.io.compression.codec","lzf").set("spark.shuffle.spill","false")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
......
......@@ -23,7 +23,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.0.0</version>
<version>1.1.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment