Commit 874f7510 authored by Gurvinder Singh's avatar Gurvinder Singh

added support for caching the sql table and run two different queries

parent 1586b6fd
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import StorageLevel
conf = SparkConf()
conf.setAppName("SQL test").set("spark.executor.memory", "12g").set("spark.mesos.coarse","true")
conf.setAppName("SQL test").set("spark.executor.memory", "17g").set("spark.mesos.coarse","true")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-*.csv")
#lines = sc.textFile("hdfs://daas/spark/test")
parts = lines.map(lambda l: l.split(","))
#records = parts.map(lambda p: {"text": p[0], "val": int(p[1]), "val1": int(p[2])})
records = parts.map(lambda p: {"stime": str(p[0]), "endtime": str(p[1]), "srcip": str(p[3]), "dstip": str(p[4]), "srcport": str(p[5]), "dstport": str(p[6])})
recordsTable = sqlCtx.inferSchema(records)
recordsTable.registerAsTable("records")
sqlCtx.cacheTable("records")
top_port = sqlCtx.sql("SELECT dstport, count(dstport) as c1 FROM records GROUP BY dstport ORDER BY c1 DESC LIMIT 10").map(lambda x: (x.dstport, x.c1)).collect()
print "Top ports:"
print "{:>6} {:>12}".format("Port", "Count")
top_dport = sqlCtx.sql("SELECT dstport, count(dstport) as c1 FROM records GROUP BY dstport ORDER BY c1 DESC LIMIT 10").map(lambda x: (x.dstport, x.c1)).collect()
print "Top Dest ports:"
print "{:>6} {:>12}".format("Dest Port", "Count")
for port, count in top_port:
print "{:>6} {:>12}".format(port, count)
top_sport = sqlCtx.sql("SELECT srcport, count(srcport) as c1 FROM records GROUP BY srcport ORDER BY c1 DESC LIMIT 10").map(lambda x: (x.srcport, x.c1)).collect()
print "Top Src ports:"
print "{:>6} {:>12}".format("Src Port", "Count")
for port, count in top_port:
print "{:>6} {:>12}".format(port, count)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment