Commit 2b798099 authored by Morten Knutsen's avatar Morten Knutsen

Move algorithms to a seperate file for testing with spark. Add some code to...

Move algorithms to a seperate file for testing with spark. Add some code to count flows per port per IP address.
parent b9fc0413
def add(x, y):
return x + y
def top_ips(csv, which="both", num=10):
if which == "both":
ips = csv.flatMap(lambda x: x[1:3])
elif which == "client":
ips = csv.map(lambda x: x[1])
elif which == "server":
ips = csv.map(lambda x: x[2])
ip_count = ips.map(lambda x: (x, 1)).reduceByKey(add)
return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)
def top_ports(csv, num=10):
ports = csv.map(lambda x: x[3])
port_count = ports.map(lambda x: (x, 1)).reduceByKey(add)
return port_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)
def ports_count_by_ip3(csv):
ips = csv.map(lambda x: ((x[6], x[3], x[4]), 1))
ip_count = ips.reduceByKey(add)
return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)
def ports_count_by_ip(csv):
srcs = csv.map(lambda x: ((x[6], x[3]), 1))
dsts = csv.map(lambda x: ((x[6], x[4]), 1))
ips = srcs.union(dsts).reduceByKey(add)
return ips.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)
from pyspark.conf import SparkConf
from pyspark import SparkContext
import argparse
from netflowAlgs import top_ips, top_ports, ports_count_by_ip, ports_count_by_ip3
DESCRIPTION = "Analyze netflow data"
......@@ -15,30 +16,11 @@ def parse_args():
parser.add_argument('--input', help="Data file to read")
parser.add_argument('--find-top-ports', action="store_true", help="Find top ports")
parser.add_argument('--find-top-ssh-clients', action="store_true", help="Find addresses of most active ssh clients")
parser.add_argument('--count-ports-by-ip', action="store_true", help="Count number of flows per port per address")
parser.add_argument('--count-ports-by-src-dst-ip', action="store_true", help="Count number of flows per port per src/dst address")
return parser.parse_args()
def add(x, y):
return x + y
def top_ips(csv, which="both", num=10):
if which == "both":
ips = csv.flatMap(lambda x: x[1:3])
elif which == "client":
ips = csv.map(lambda x: x[1])
elif which == "server":
ips = csv.map(lambda x: x[2])
ip_count = ips.map(lambda x: (x, 1)).reduceByKey(add)
return ip_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)
def top_ports(csv, num=10):
ports = csv.map(lambda x: x[3])
port_count = ports.map(lambda x: (x, 1)).reduceByKey(add)
return port_count.map(lambda x: (x[1], x[0])).sortByKey(False).take(num)
opts = parse_args()
csv = sc.textFile(opts.input).map(lambda x: x.split(","))
......@@ -54,10 +36,26 @@ if opts.find_top_ports:
if opts.find_top_ssh_clients:
print "Finding active ssh ips"
ssh_ips = csv.filter(lambda x: x[3] == '22')
ssh_ips = csv.filter(lambda x:x[3] == '22')
top_ssh_clients = top_ips(ssh_ips, "client", 15)
print "\n\n"
print "Top addresses involved in ssh traffic"
print "{:>15} {:>9}".format("Address", "Count")
for count, address in top_ssh_clients:
print "{:>15} {:>9}".format(address, count)
if opts.count_ports_by_ip:
print "Counting flows per port per IP address (top 20)"
results = ports_count_by_ip(csv)
print "\n\n"
print "{:>6} {:>15} {:>9}".format("Port", "Address", "Count")
for count, key in results:
print "{:>6} {:>15} {:>9}".format(key[0], key[1], count)
if opts.count_ports_by_src_dst_ip:
print "Counting flows per port per src/dst IP address (top 20)"
results = ports_count_by_ip3(csv)
print "\n\n"
print "{:>6} {:>15} {:>15} {:>9}".format("Port", "Source Addr", "Dest Addr", "Count")
for count, key in results:
print "{:>6} {:>15} {:>15} {:>9}".format(key[0], key[1], key[2], count)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment