Commit 1586b6fd authored by Gurvinder Singh's avatar Gurvinder Singh

reading all the netflow files and updated schema

parent 09a2842b
......@@ -7,11 +7,11 @@ conf.setAppName("SQL test").set("spark.executor.memory", "12g").set("spark.mesos
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.csv")
lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-*.csv")
#lines = sc.textFile("hdfs://daas/spark/test")
parts = lines.map(lambda l: l.split(","))
#records = parts.map(lambda p: {"text": p[0], "val": int(p[1]), "val1": int(p[2])})
records = parts.map(lambda p: {"date": str(p[0]), "srcip": str(p[3]), "dstip": str(p[4]), "dstport": str(p[6])})
records = parts.map(lambda p: {"stime": str(p[0]), "endtime": str(p[1]), "srcip": str(p[3]), "dstip": str(p[4]), "srcport": str(p[5]), "dstport": str(p[6])})
recordsTable = sqlCtx.inferSchema(records)
recordsTable.registerAsTable("records")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment