Commit 9bdfc08d authored by Morten Knutsen's avatar Morten Knutsen

Increase memory and paralleism for reduce/sort.

parent 2b798099
......@@ -19,8 +19,8 @@ def top_ports(csv, num=10):
def ports_count_by_ip3(csv):
ips = csv.map(lambda x: ((x[6], x[3], x[4]), 1))
ip_count = ips.reduceByKey(add)
return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)
ip_count = ips.reduceByKey(add, numPartitions=30)
return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False, numPartitions=30).take(20)
def ports_count_by_ip(csv):
srcs = csv.map(lambda x: ((x[6], x[3]), 1))
......
......@@ -6,7 +6,7 @@ from netflowAlgs import top_ips, top_ports, ports_count_by_ip, ports_count_by_ip
DESCRIPTION = "Analyze netflow data"
conf = SparkConf()
conf.setAppName("Netflow test").set("spark.executor.memory", "1g").set("spark.default.parallelism", 15).set("spark.mesos.coarse", "false")
conf.setAppName("Netflow test").set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment