netflowTest.py 2.35 KB
Newer Older
Sigmund Augdal's avatar
Sigmund Augdal committed
1
2
from pyspark.conf import SparkConf
from pyspark import SparkContext
3
import argparse
4
from netflowAlgs import top_ips, top_ports, ports_count_by_ip, ports_count_by_ip3
5
6

DESCRIPTION = "Analyze netflow data"
Sigmund Augdal's avatar
Sigmund Augdal committed
7
8

conf = SparkConf()
Gurvinder Singh's avatar
Gurvinder Singh committed
9
conf.setAppName("Netflow test").set("spark.executor.memory", "1g").set("spark.default.parallelism", 15).set("spark.mesos.coarse", "false")
Sigmund Augdal's avatar
Sigmund Augdal committed
10
11
12
13

sc = SparkContext(conf=conf)


14
15
16
17
18
def parse_args():
    parser = argparse.ArgumentParser(description=DESCRIPTION)
    parser.add_argument('--input', help="Data file to read")
    parser.add_argument('--find-top-ports', action="store_true", help="Find top ports")
    parser.add_argument('--find-top-ssh-clients', action="store_true", help="Find addresses of most active ssh clients")
19
20
    parser.add_argument('--count-ports-by-ip', action="store_true", help="Count number of flows per port per address")
    parser.add_argument('--count-ports-by-src-dst-ip', action="store_true", help="Count number of flows per port per src/dst address")
Sigmund Augdal's avatar
Sigmund Augdal committed
21

22
    return parser.parse_args()
Sigmund Augdal's avatar
Sigmund Augdal committed
23

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
opts = parse_args()

csv = sc.textFile(opts.input).map(lambda x: x.split(","))

if opts.find_top_ports:
    print "Finding top ports"
    top = top_ports(csv)
    print "\n\n"
    print "Top ports:"
    print "{:>6} {:>12}".format("Port", "Count")
    for count, port in top:
        print "{:>6} {:>12}".format(port, count)

if opts.find_top_ssh_clients:
    print "Finding active ssh ips"
39
    ssh_ips = csv.filter(lambda x:x[3] == '22')
40
41
42
43
44
45
    top_ssh_clients = top_ips(ssh_ips, "client", 15)
    print "\n\n"
    print "Top addresses involved in ssh traffic"
    print "{:>15} {:>9}".format("Address", "Count")
    for count, address in top_ssh_clients:
        print "{:>15} {:>9}".format(address, count)
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

if opts.count_ports_by_ip:
    print "Counting flows per port per IP address (top 20)"
    results = ports_count_by_ip(csv)
    print "\n\n"
    print "{:>6} {:>15} {:>9}".format("Port", "Address", "Count")
    for count, key in results:
        print "{:>6} {:>15} {:>9}".format(key[0], key[1], count)

if opts.count_ports_by_src_dst_ip:
    print "Counting flows per port per src/dst IP address (top 20)"
    results = ports_count_by_ip3(csv)
    print "\n\n"
    print "{:>6} {:>15} {:>15} {:>9}".format("Port", "Source Addr", "Dest Addr", "Count")
    for count, key in results:
        print "{:>6} {:>15} {:>15} {:>9}".format(key[0], key[1], key[2], count)