netflowAlgs.py 1.17 KB
Newer Older
1
2
3
4
5
SRC_IP = 3
DEST_IP = 4
DEST_PORT = 6


6
7
8
def add(x, y):
    return x + y

9

10
11
def top_ips(csv, which="both", num=10):
    if which == "both":
12
        ips = csv.flatMap(lambda x: (x[SRC_IP], x[DEST_IP]))
13
    elif which == "client":
14
        ips = csv.map(lambda x: x[SRC_IP])
15
    elif which == "server":
16
        ips = csv.map(lambda x: x[DEST_IP])
17
18
19
20
21
    ip_count = ips.map(lambda x: (x, 1)).reduceByKey(add)
    return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)


def top_ports(csv, num=10):
22
    ports = csv.map(lambda x: x[DEST_PORT])
23
24
25
    port_count = ports.map(lambda x: (x, 1)).reduceByKey(add)
    return port_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)

26

27
def ports_count_by_ip3(csv):
28
    ips = csv.map(lambda x: ((x[DEST_PORT], x[SRC_IP], x[DEST_IP]), 1))
29
30
    ip_count = ips.reduceByKey(add, numPartitions=30)
    return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False, numPartitions=30).take(20)
31

32

33
def ports_count_by_ip(csv):
34
35
    srcs = csv.map(lambda x: ((x[DEST_PORT], x[SRC_IP]), 1))
    dsts = csv.map(lambda x: ((x[DEST_PORT], x[DEST_IP]), 1))
36
37
    ips = srcs.union(dsts).reduceByKey(add)
    return ips.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)