netflowTest.py 2.74 KB
Newer Older
Sigmund Augdal's avatar
Sigmund Augdal committed
1 2
from pyspark.conf import SparkConf
from pyspark import SparkContext
3
import argparse
4
import os
5
from netflowAlgs import top_ips, top_ports, ports_count_by_ip, ports_count_by_ip3
6 7

DESCRIPTION = "Analyze netflow data"
Sigmund Augdal's avatar
Sigmund Augdal committed
8 9

conf = SparkConf()
10
#conf.setAppName("Netflow test").set("spark.executor.memory", "12g").set("spark.mesos.coarse","true")
Gurvinder Singh's avatar
Gurvinder Singh committed
11
conf.setAppName("Netflow test").set("spark.executor.memory", "12g").set("spark.io.compression.codec","lzf").set("spark.shuffle.spill","false").set("spark.mesos.coarse","true")
Sigmund Augdal's avatar
Sigmund Augdal committed
12 13 14 15

sc = SparkContext(conf=conf)


16 17 18 19 20
def parse_args():
    parser = argparse.ArgumentParser(description=DESCRIPTION)
    parser.add_argument('--input', help="Data file to read")
    parser.add_argument('--find-top-ports', action="store_true", help="Find top ports")
    parser.add_argument('--find-top-ssh-clients', action="store_true", help="Find addresses of most active ssh clients")
21 22
    parser.add_argument('--count-ports-by-ip', action="store_true", help="Count number of flows per port per address")
    parser.add_argument('--count-ports-by-src-dst-ip', action="store_true", help="Count number of flows per port per src/dst address")
Sigmund Augdal's avatar
Sigmund Augdal committed
23

24
    return parser.parse_args()
Sigmund Augdal's avatar
Sigmund Augdal committed
25

26
opts = parse_args()
27
fname, fext = os.path.splitext(opts.input)
28

29 30 31 32
if fext == ".lzo":
    csv = sc.newAPIHadoopFile(opts.input,"com.hadoop.mapreduce.LzoTextInputFormat","org.apache.hadoop.io.LongWritable","org.apache.hadoop.io.Text").map(lambda x: x[1].split(","))
else:
    csv = sc.textFile(opts.input).map(lambda x: x.split(","))
33 34 35 36 37 38 39 40 41 42 43 44

if opts.find_top_ports:
    print "Finding top ports"
    top = top_ports(csv)
    print "\n\n"
    print "Top ports:"
    print "{:>6} {:>12}".format("Port", "Count")
    for count, port in top:
        print "{:>6} {:>12}".format(port, count)

if opts.find_top_ssh_clients:
    print "Finding active ssh ips"
Sigmund Augdal's avatar
Sigmund Augdal committed
45
    ssh_ips = csv.filter(lambda x: x[3] == '22')
46 47 48 49 50 51
    top_ssh_clients = top_ips(ssh_ips, "client", 15)
    print "\n\n"
    print "Top addresses involved in ssh traffic"
    print "{:>15} {:>9}".format("Address", "Count")
    for count, address in top_ssh_clients:
        print "{:>15} {:>9}".format(address, count)
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67

if opts.count_ports_by_ip:
    print "Counting flows per port per IP address (top 20)"
    results = ports_count_by_ip(csv)
    print "\n\n"
    print "{:>6} {:>15} {:>9}".format("Port", "Address", "Count")
    for count, key in results:
        print "{:>6} {:>15} {:>9}".format(key[0], key[1], count)

if opts.count_ports_by_src_dst_ip:
    print "Counting flows per port per src/dst IP address (top 20)"
    results = ports_count_by_ip3(csv)
    print "\n\n"
    print "{:>6} {:>15} {:>15} {:>9}".format("Port", "Source Addr", "Dest Addr", "Count")
    for count, key in results:
        print "{:>6} {:>15} {:>15} {:>9}".format(key[0], key[1], key[2], count)