def add(x, y): return x + y def top_ips(csv, which="both", num=10): if which == "both": ips = csv.flatMap(lambda x: x[1:3]) elif which == "client": ips = csv.map(lambda x: x[1]) elif which == "server": ips = csv.map(lambda x: x[2]) ip_count = ips.map(lambda x: (x, 1)).reduceByKey(add) return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num) def top_ports(csv, num=10): ports = csv.map(lambda x: x[3]) port_count = ports.map(lambda x: (x, 1)).reduceByKey(add) return port_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num) def ports_count_by_ip3(csv): ips = csv.map(lambda x: ((x[6], x[3], x[4]), 1)) ip_count = ips.reduceByKey(add) return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(20) def ports_count_by_ip(csv): srcs = csv.map(lambda x: ((x[6], x[3]), 1)) dsts = csv.map(lambda x: ((x[6], x[4]), 1)) ips = srcs.union(dsts).reduceByKey(add) return ips.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)