Commit 1c7937c3 authored by Gurvinder Singh's avatar Gurvinder Singh

added support for reading lzo files

parent cd8890ca
......@@ -26,8 +26,8 @@ def top_ports(csv, num=10):
def ports_count_by_ip3(csv):
ips = csv.map(lambda x: ((x[DEST_PORT], x[SRC_IP], x[DEST_IP]), 1))
ip_count = ips.reduceByKey(add, numPartitions=30)
return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False, numPartitions=30).take(20)
ip_count = ips.reduceByKey(add, numPartitions=120)
return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False, numPartitions=120).take(20)
def ports_count_by_ip(csv):
......
from pyspark.conf import SparkConf
from pyspark import SparkContext
import argparse
import os
from netflowAlgs import top_ips, top_ports, ports_count_by_ip, ports_count_by_ip3
DESCRIPTION = "Analyze netflow data"
conf = SparkConf()
conf.setAppName("Netflow test").set("spark.executor.memory", "2g")
#conf.setAppName("Netflow test").set("spark.executor.memory", "12g").set("spark.mesos.coarse","true")
conf.setAppName("Netflow test").set("spark.executor.memory", "12g")
sc = SparkContext(conf=conf)
......@@ -22,8 +24,12 @@ def parse_args():
return parser.parse_args()
opts = parse_args()
fname, fext = os.path.splitext(opts.input)
csv = sc.textFile(opts.input).map(lambda x: x.split(","))
if fext == ".lzo":
csv = sc.newAPIHadoopFile(opts.input,"com.hadoop.mapreduce.LzoTextInputFormat","org.apache.hadoop.io.LongWritable","org.apache.hadoop.io.Text").map(lambda x: x[1].split(","))
else:
csv = sc.textFile(opts.input).map(lambda x: x.split(","))
if opts.find_top_ports:
print "Finding top ports"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment