Commit eb339cee authored by Gurvinder Singh's avatar Gurvinder Singh

added code for parquet test as well as another app which teste numpy and sklearn support

parent 874f7510
from pyspark import SparkContext
import numpy as np
from sklearn.cross_validation import train_test_split, ShuffleSplit
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier
sc = SparkContext()
def zero_matrix(n, m):
return np.zeros(n*m, dtype = int).reshape(n, m)
def vote_increment(y_est):
increment = zero_matrix(y_est.size, n_ys)
increment[np.arange(y_est.size), y_est] = 1
return increment # test point x class matrix with 1s marking the estimator prediction
X, y = make_classification()
X_train, X_test, y_train, y_test = train_test_split(X, y)
n_test = X_test.shape[0]
n_ys = np.unique(y_train).size
model = DecisionTreeClassifier()
# Partition the training data into random sub-samples with replacement.
samples = sc.parallelize(ShuffleSplit(y.size))
# Train a model for each sub-sample and apply it to the test data.
vote_tally = samples.map(lambda (index, _):
model.fit(X[index], y[index]).predict(X_test)).map(vote_increment).fold(zero_matrix(n_test, n_ys), np.add)
# Take the learner majority vote.
y_estimate_vote = np.argmax(vote_tally, axis = 1)
print(accuracy_score(y_test, y_estimate_vote))
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import StorageLevel
conf = SparkConf()
conf.setAppName("SQL test").set("spark.executor.memory", "17g").set("spark.mesos.coarse","true")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
# Parquet Save file
#lines = sc.textFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.csv")
#parts = lines.map(lambda l: l.split(","))
#records = parts.map(lambda p: {"stime": str(p[0]), "endtime": str(p[1]), "srcip": str(p[3]), "dstip": str(p[4]), "srcport": str(p[5]), "dstport": str(p[6])})
#recordsTable = sqlCtx.inferSchema(records)
#recordsTable.saveAsParquetFile("hdfs://daas/daas_flows/trd-gw-2014-05-03-parquet")
# Load saved parquet file
precords=sqlCtx.parquetFile("hdfs://daas/daas_flows/trd-gw-2014-05-03.parquet")
precords.registerAsTable("precords")
sqlCtx.cacheTable("precords")
top_pdport = sqlCtx.sql("SELECT dstport, count(dstport) as c1 FROM precords GROUP BY dstport ORDER BY c1 DESC LIMIT 10").map(lambda x: (x.dstport, x.c1)).collect()
print "Top Dest ports:"
print "{:>6} {:>12}".format("Dest Port", "Count")
for port, count in top_port:
print "{:>6} {:>12}".format(port, count)
top_psport = sqlCtx.sql("SELECT srcport, count(srcport) as c1 FROM precords GROUP BY srcport ORDER BY c1 DESC LIMIT 10").map(lambda x: (x.srcport, x.c1)).collect()
print "Top Src ports:"
print "{:>6} {:>12}".format("Src Port", "Count")
for port, count in top_port:
print "{:>6} {:>12}".format(port, count)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment