Commit 0ac1f5ac authored by Sigmund Augdal's avatar Sigmund Augdal

Reimplemented the find top ports method using the sql interface

parent 1c7937c3
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(appName='testSQL')
conf = SparkConf()
conf.setAppName("SQL test").set("spark.executor.memory", "12g").set("spark.mesos.coarse","true")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
#lines = sc.textFile("hdfs://daas/user/hdfs/csv-old/trd_gw1_12_01_normalized.csv")
lines = sc.textFile("hdfs://daas/spark/test")
lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.csv")
#lines = sc.textFile("hdfs://daas/spark/test")
parts = lines.map(lambda l: l.split(","))
records = parts.map(lambda p: {"text": p[0], "val": int(p[1]), "val1": int(p[2])})
#records = parts.map(lambda p: {"text": p[0], "val": int(p[1]), "val1": int(p[2])})
records = parts.map(lambda p: {"date": str(p[0]), "srcip": str(p[3]), "dstip": str(p[4]), "dstport": str(p[6])})
recordsTable = sqlCtx.inferSchema(records)
recordsTable.registerAsTable("records")
http = sqlCtx.sql("SELECT text FROM records WHERE val1 >= 1 AND val1 <= 41")
text = http.map(lambda p: "Text: "+p.text)
print(text.collect())
top_port = sqlCtx.sql("SELECT dstport, count(dstport) as c1 FROM records GROUP BY dstport ORDER BY c1 DESC LIMIT 10").map(lambda x: (x.dstport, x.c1)).collect()
print "Top ports:"
print "{:>6} {:>12}".format("Port", "Count")
for port, count in top_port:
print "{:>6} {:>12}".format(port, count)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment