netflowTest.py 1.05 KB
Newer Older
Sigmund Augdal's avatar
Sigmund Augdal committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from pyspark.conf import SparkConf
from pyspark import SparkContext

conf = SparkConf()
conf.setAppName("Netflow test").set("spark.executor.memory", "1g").set("spark.default.parallelism", 4)

sc = SparkContext(conf=conf)


def add(x, y):
    return x + y

#path = 'hdfs://daas/user/hdfs/trd_gw1_12_01_normalized.csv'
path = 'hdfs://daas/user/hdfs/trd_gw1_12_normalized.csv/*'
csv = sc.textFile(path).map(lambda x: x.split(",")).cache()


def top_ips(csv, num=10):
    ips = csv.flatMap(lambda x: x[1:3])
    ip_count = ips.map(lambda x: (x, 1)).reduceByKey(add)
    return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)


def top_ports(csv, num=10):
    ports = csv.map(lambda x: x[3])
    port_count = ports.map(lambda x: (x, 1)).reduceByKey(add)
    return port_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)

# print "Finding top ports"
# top = top_ports(csv)
# print "Port   Count"
# for count, port in top:
#     print port, count

print "Finding active ssh ips"
ssh_ips = csv.filter(lambda x: x[3] == '22')
print top_ips(ssh_ips, 15)