Added constants for the CSV fields, and updated the older functions for the new format

SRC_IP = 3
def add(x, y):
return x + y
def top_ips(csv, which="both", num=10):
if which == "both":
ips = csv.flatMap(lambda x: x[1:3])
ips = csv.flatMap(lambda x: (x[SRC_IP], x[DEST_IP]))
elif which == "client":
ips = x: x[1])
ips = x: x[SRC_IP])
elif which == "server":
ips = x: x[2])
ips = x: x[DEST_IP])
ip_count = x: (x, 1)).reduceByKey(add)
return x: (x[1], x[0])).sortByKey(False).take(num)
def top_ports(csv, num=10):
ports = x: x[3])
ports = x: x[DEST_PORT])
port_count = x: (x, 1)).reduceByKey(add)
return x: (x[1], x[0])).sortByKey(False).take(num)
def ports_count_by_ip3(csv):
ips = x: ((x[6], x[3], x[4]), 1))
ips = x: ((x[DEST_PORT], x[SRC_IP], x[DEST_IP]), 1))
ip_count = ips.reduceByKey(add, numPartitions=30)
return x: (x[1], x[0])).sortByKey(False, numPartitions=30).take(20)
def ports_count_by_ip(csv):
srcs = x: ((x[6], x[3]), 1))
dsts = x: ((x[6], x[4]), 1))
srcs = x: ((x[DEST_PORT], x[SRC_IP]), 1))
dsts = x: ((x[DEST_PORT], x[DEST_IP]), 1))
ips = srcs.union(dsts).reduceByKey(add)
return x: (x[1], x[0])).sortByKey(False).take(20)
