from pyspark.conf import SparkConf from pyspark import SparkContext import argparse from netflowAlgs import top_ips, top_ports, ports_count_by_ip, ports_count_by_ip3 DESCRIPTION = "Analyze netflow data" conf = SparkConf() conf.setAppName("Netflow test").set("spark.executor.memory", "1g").set("spark.default.parallelism", 15).set("spark.mesos.coarse", "false") sc = SparkContext(conf=conf) def parse_args(): parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument('--input', help="Data file to read") parser.add_argument('--find-top-ports', action="store_true", help="Find top ports") parser.add_argument('--find-top-ssh-clients', action="store_true", help="Find addresses of most active ssh clients") parser.add_argument('--count-ports-by-ip', action="store_true", help="Count number of flows per port per address") parser.add_argument('--count-ports-by-src-dst-ip', action="store_true", help="Count number of flows per port per src/dst address") return parser.parse_args() opts = parse_args() csv = sc.textFile(opts.input).map(lambda x: x.split(",")) if opts.find_top_ports: print "Finding top ports" top = top_ports(csv) print "\n\n" print "Top ports:" print "{:>6} {:>12}".format("Port", "Count") for count, port in top: print "{:>6} {:>12}".format(port, count) if opts.find_top_ssh_clients: print "Finding active ssh ips" ssh_ips = csv.filter(lambda x:x[3] == '22') top_ssh_clients = top_ips(ssh_ips, "client", 15) print "\n\n" print "Top addresses involved in ssh traffic" print "{:>15} {:>9}".format("Address", "Count") for count, address in top_ssh_clients: print "{:>15} {:>9}".format(address, count) if opts.count_ports_by_ip: print "Counting flows per port per IP address (top 20)" results = ports_count_by_ip(csv) print "\n\n" print "{:>6} {:>15} {:>9}".format("Port", "Address", "Count") for count, key in results: print "{:>6} {:>15} {:>9}".format(key[0], key[1], count) if opts.count_ports_by_src_dst_ip: print "Counting flows per port per src/dst IP address (top 20)" results = ports_count_by_ip3(csv) print "\n\n" print "{:>6} {:>15} {:>15} {:>9}".format("Port", "Source Addr", "Dest Addr", "Count") for count, key in results: print "{:>6} {:>15} {:>15} {:>9}".format(key[0], key[1], key[2], count)