netflowAlgs.py 1.02 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def add(x, y):
    return x + y

def top_ips(csv, which="both", num=10):
    if which == "both":
        ips = csv.flatMap(lambda x: x[1:3])
    elif which == "client":
        ips = csv.map(lambda x: x[1])
    elif which == "server":
        ips = csv.map(lambda x: x[2])
    ip_count = ips.map(lambda x: (x, 1)).reduceByKey(add)
    return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)


def top_ports(csv, num=10):
    ports = csv.map(lambda x: x[3])
    port_count = ports.map(lambda x: (x, 1)).reduceByKey(add)
    return port_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)

def ports_count_by_ip3(csv):
    ips = csv.map(lambda x: ((x[6], x[3], x[4]), 1))
    ip_count = ips.reduceByKey(add)
    return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)

def ports_count_by_ip(csv):
    srcs = csv.map(lambda x: ((x[6], x[3]), 1))
    dsts = csv.map(lambda x: ((x[6], x[4]), 1))
    ips = srcs.union(dsts).reduceByKey(add)
    return ips.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)