SRC_IP = 3 DEST_IP = 4 DEST_PORT = 6 def add(x, y): return x + y def top_ips(csv, which="both", num=10): if which == "both": ips = csv.flatMap(lambda x: (x[SRC_IP], x[DEST_IP])) elif which == "client": ips = csv.map(lambda x: x[SRC_IP]) elif which == "server": ips = csv.map(lambda x: x[DEST_IP]) ip_count = ips.map(lambda x: (x, 1)).reduceByKey(add) return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num) def top_ports(csv, num=10): ports = csv.map(lambda x: x[DEST_PORT]) port_count = ports.map(lambda x: (x, 1)).reduceByKey(add) return port_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num) def ports_count_by_ip3(csv): ips = csv.map(lambda x: ((x[DEST_PORT], x[SRC_IP], x[DEST_IP]), 1)) ip_count = ips.reduceByKey(add, numPartitions=30) return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False, numPartitions=30).take(20) def ports_count_by_ip(csv): srcs = csv.map(lambda x: ((x[DEST_PORT], x[SRC_IP]), 1)) dsts = csv.map(lambda x: ((x[DEST_PORT], x[DEST_IP]), 1)) ips = srcs.union(dsts).reduceByKey(add) return ips.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)