Commit c655f949 authored by Sigmund Augdal's avatar Sigmund Augdal
Test netflow analysis app

parent 6524f2c8
from pyspark.conf import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setAppName("Netflow test").set("spark.executor.memory", "1g").set("spark.default.parallelism", 4)
sc = SparkContext(conf=conf)
def add(x, y):
return x + y
#path = 'hdfs://daas/user/hdfs/trd_gw1_12_01_normalized.csv'
path = 'hdfs://daas/user/hdfs/trd_gw1_12_normalized.csv/*'
csv = sc.textFile(path).map(lambda x: x.split(",")).cache()
def top_ips(csv, num=10):
ips = csv.flatMap(lambda x: x[1:3])
ip_count = x: (x, 1)).reduceByKey(add)
return x: (x[1], x[0])).sortByKey(False).take(num)
def top_ports(csv, num=10):
ports = x: x[3])
port_count = x: (x, 1)).reduceByKey(add)
return x: (x[1], x[0])).sortByKey(False).take(num)
# print "Finding top ports"
# top = top_ports(csv)
# print "Port Count"
# for count, port in top:
# print port, count
print "Finding active ssh ips"
ssh_ips = csv.filter(lambda x: x[3] == '22')
print top_ips(ssh_ips, 15)
