嗨呀A1好气啊.jpg
白给白给,这次A2随便写算了,反正怎么搞都是70多。。。就很难受_(:з」∠)_
可能以为分数问题,所以A2加了很多批注,就不往文章里塞别的注释了。。嗯
!pip install wget !apt-get install openjdk-8-jdk-headless -qq > /dev/null !wget -q https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz !tar xf spark-2.4.0-bin-hadoop2.7.tgz !pip install -q findspark import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7" import findspark findspark.init() from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import SQLContext sc = SparkContext.getOrCreate()
import wget link_to_data = 'https://github.com/tulip-lab/sit742/raw/master/Assessment/2019/data/bank.csv' DataSet = wget.download(link_to_data) !ls # Import the 'bank.csv' as a Spark dataframe and name it as df spark = SparkSession.builder.appName('ml-bank').getOrCreate() df = spark.read.csv('bank.csv', header = True, inferSchema = True) # Check data distribution # You may use printSchema() #Use printSchema() to show the content of dataset df.printSchema() #Showing first 5 rows of data df.show(5) #Using describe() to show distribution of numerical data. df.describe(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']).show()
#Select features ('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit') as df2 #Selecting required attributes as df2 df2=df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit') #Showing contents of df2 df2.printSchema() #Showing first 5 rows of df2 df2.show(5) #Remove invalid rows/records using spark.sql #Register df2 as table "bank" df2.registerTempTable("bank") #Using spark.sql filter out rows with attributes with "unknown" #Not all attribute are having "unknown" value, so not all attributes put into this section. #Hence, poutcome can only be "success" or "failure", so row with "other" in this attribute also will be filter out. df3=spark.sql("SELECT * FROM bank WHERE job <> 'unknown' AND education <> 'unknown' AND poutcome <> 'unknown' AND poutcome <> 'other'") #Showing contents of filtered dataframe df3.printSchema() #..and first 5 rows df3.show(5) #Covert categorical features to metric features using One hot encoding from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator from pyspark.ml.feature import VectorAssembler #Transforme categorical data into indexes for further transformation by StringIndexer. #This step is to selecting categorical attributes for transforme into indexes indexers=[StringIndexer(inputCol=column, outputCol=column+"_index").fit(df3) for column in list(set(df3.columns)-set(['age', 'balance', 'campaign', 'pdays', 'previous','deposit']))] #Transforme multiple categorical attributres by with the help of Pipeline. pipeline=Pipeline(stages=indexers) df4=pipeline.fit(df3).transform(df3) #Defining what columns should use One-Hot Encoding, and also the output columns. encoder=OneHotEncoderEstimator(inputCols=["housing_index", "marital_index", "default_index", "loan_index", "job_index", "education_index", "poutcome_index"], outputCols=["housing_vec", "marital_vec", "default_vec", "loan_vec", "job_vec", "education_vec", "poutcome_vec"]) #Step of One-Hot Encoding transformation. model_ohe=encoder.fit(df4) df5=model_ohe.transform(df4) #Delect the original and index data for categorical attribues, only left the transformed result and numerical data. df6=df5.select([column for column in df5.columns if column not in ["housing", "marital", "default", "loan", "job", "education", "poutcome", "housing_index", "marital_index", "default_index", "loan_index", "job_index", "education_index", "poutcome_index"]]) #Assemble all features into one attribute, for prepare of normalization. assembler=VectorAssembler(inputCols=["age", "balance", "campaign", "pdays", "previous", "job_vec", "education_vec", "marital_vec", "poutcome_vec", "housing_vec", "loan_vec", "default_vec"], outputCol="features") df7=assembler.transform(df6) #Only left the feature attribute. df8=df7.select("features") #...and also showing the content df8.printSchema() #Bibliography #https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml #https://stackoverflow.com/questions/36942233/apply-stringindexer-to-several-columns-in-a-pyspark-dataframe #https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator from pyspark.ml.feature import StringIndexer #Because the attribute "deposit" is also the label, so we select "deposit" to a single dataframe as y y_df1=df3.select("deposit") #...also transforme label into index type by using StringIndexer indexer=StringIndexer(inputCol="deposit", outputCol="deposit_index") y_df2=indexer.fit(y_df1).transform(y_df1) #...and store as y y=y_df2.select('deposit_index') y.printSchema()
#Apply Min-Max normalisation on each attribute using MinMaxScaler from pyspark.ml.feature import MinMaxScaler #Using MinMaxScaler for column "features" normalnization scaler=MinMaxScaler(inputCol="features", outputCol="scaledFeatures") scalerModel=scaler.fit(df8) scaled=scalerModel.transform(df8) #Print the scaled range print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax())) #...and also store as dataframe X X=scaled.select("scaledFeatures") X.show(truncate=False) #Bibliography #https://spark.apache.org/docs/2.2.0/ml-features.html#minmaxscaler from pyspark.sql.functions import monotonically_increasing_id #Generating "id" for both X and y dataframe for merging X_index = X.select("*").withColumn("id", monotonically_increasing_id()) y_index = y.select("*").withColumn("id", monotonically_increasing_id()) #Merging X and y into one dataframe data_model = y_index.join(X_index, "id", "outer").drop("id") #Transform into dataframe, store as df100 df100=data_model.toDF('label', 'features') df100.show()
# Perform unsupervised learning on df2 with k-means # You can use whole df2 as both training and testing data, # Evaluate the clustering result using Accuracy. from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import ClusteringEvaluator from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import StringIndexer from pyspark.sql.types import DoubleType #Setting up k-means cluster parameter #Because we need to "estimate" the label of data, so we only performe 2 cluster for getting close to the binomial label... k_means=KMeans().setK(2).setSeed(1) #Applying k-means cluster cluster=k_means.fit(df100) cluster_predictions=cluster.transform(df100) #First evalustor for cluster: ClusteringEvaluator, by using silhouette criteria evaluator=ClusteringEvaluator() silhouette = evaluator.evaluate(cluster_predictions) print("Silhouette with squared distnce is :" + str(silhouette)) #The second evalustor: MulticlassClassificationEvaluator, by using accuracy criteria kmeans_acc_evaluator = MulticlassClassificationEvaluator() #But first we need to transfrome datatype for k-means model's prediction: integer to binomial cluster_predictions.printSchema() cluster_predictions_1 = cluster_predictions.withColumn("prediction", cluster_predictions["prediction"].cast("double")) #Then apply accuracy criteria for cluster model print("Test Accuracy: " + str(kmeans_acc_evaluator.evaluate(cluster_predictions_1, {kmeans_acc_evaluator.metricName: "accuracy"}))) #Showing the centroids of cluster. centers = cluster.clusterCenters() print("Cluster Centers: ") for center in centers: print(center) #...and showing some prediction result for k-means cluster model cluster_predictions_1.show() #Bibliography #https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark #https://spark.apache.org/docs/2.4.2/ml-clustering.html #https://jakevdp.github.io/PythonDataScienceHandbook/05.11-k-means.html
#Generate a scatter plot using the first two PCA components to investigate the data distribution. from pyspark.ml.feature import PCA, VectorAssembler from pyspark.ml.linalg import Vectors import matplotlib.pyplot as plt import numpy as np #Setting up PCA parameters, only 2 compinents for plotting pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures") #Applying PCA for dataset (df100) model_pca = pca.fit(X) result_pca = model_pca.transform(X) #Showing some result for PCA result_pca.show(truncate=False) #We need to transforme PCA components into numpy array format for plotting... #Transforme to numpy array also benefits for splitting two PCA components. np_pca_1 = np.array(result_pca.select('pcaFeatures').collect()) #Some format setting for numpy array for plotting...removing 1 level for array np_pca = np_pca_1[ :, 0, :] #Scatter plot settings, including size of graph, title, x/y labels, and dot size plt.figure(dpi=120) plt.suptitle('PCA features Scatter plot', fontsize=15) plt.xlabel('components0', fontsize=10) plt.ylabel('components1', fontsize=10) plt.scatter(np_pca[:,0], np_pca[:,1], s=5, alpha=1) #Bibliography #https://stackoverflow.com/questions/12444716/how-do-i-set-the-figure-title-and-axes-labels-font-size-in-matplotlib #https://stackoverflow.com/questions/34007632/how-to-remove-a-column-in-a-numpy-array/34008274 #https://stackoverflow.com/questions/42116143/extracting-numpy-array-from-pyspark-dataframe
#Splitting dataframe into traning set and test set train, test = df100.randomSplit([0.7, 0.3], seed = 742) #...and print how many rows in both traning and test set. print("Training Dataset Count: " + str(train.count())) print("Test Dataset Count: " + str(test.count())) #Bibliography #https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa ##This Bibliography can be use for almost of the Supervised learning part. # Logistic Regression from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.evaluation import BinaryClassificationEvaluator #Setting up LogisticRegression parametrs lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10) #Training LogisticRegression model lrModel = lr.fit(train) #...and apply to test set lr_predictions = lrModel.transform(test) #Showing some prediction result for LogisticRegression lr_predictions.select('label', 'features', 'rawPrediction', 'prediction', 'probability').show(10) #Selecitng BinaryClassificationEvaluator and MulticlassClassificationEvaluator as the ROC/accuracy evaluator lr_evaluator = BinaryClassificationEvaluator() lr_acc_evaluator = MulticlassClassificationEvaluator() #Generating traning summary for LogisticRegression lr_trainingSummary = lrModel.summary #Transforming summary's ROC part to dataframe lr_roc = lr_trainingSummary.roc.toPandas() #...and start plotting plt.figure(dpi=120) plt.title('ROC Curve', fontsize=15) plt.xlabel('True Positive Rate', fontsize=10) plt.ylabel('False Positive Rate', fontsize=10) plt.plot(lr_roc['FPR'],lr_roc['TPR']) plt.show() #Showing AUC for training set print('Training set areaUnderROC: ' + str(lr_trainingSummary.areaUnderROC)) #Showing AUC for test set print("Test Area Under ROC: " + str(lr_evaluator.evaluate(lr_predictions, {lr_evaluator.metricName: "areaUnderROC"}))) #Showing accuracy for Logistic Regression predicting test set. print("Test Accuracy: " + str(lr_acc_evaluator.evaluate(lr_predictions, {lr_acc_evaluator.metricName: "accuracy"}))) #Exam the coefficients import matplotlib.pyplot as plt import numpy as np #Storing coefficients into numpy array beta = np.sort(lrModel.coefficients) #...and plotting plt.plot(beta) plt.ylabel('Beta Coefficients') plt.show()
#Decision tree from pyspark.ml.classification import DecisionTreeClassifier #Setting up Decision Tree Classifier dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 5) #Training Decision Tree model... dtModel = dt.fit(train) #...and apply to test set dt_predictions = dtModel.transform(test) #Showing some predictions for Decision Tree model on test set dt_predictions.select('label', 'features', 'rawPrediction', 'prediction', 'probability').show(10) #Importing two evaluator as same as LogisticRegression dt_evaluator = BinaryClassificationEvaluator() dt_acc_evaluator = MulticlassClassificationEvaluator() #Showing AUC for test set print("Test Area Under ROC: " + str(dt_evaluator.evaluate(dt_predictions, {dt_evaluator.metricName: "areaUnderROC"}))) #Showing accuracy for test set print("Test Accuracy: " + str(dt_acc_evaluator.evaluate(dt_predictions, {dt_acc_evaluator.metricName: "accuracy"})))
#NaiveBayes from pyspark.ml.classification import NaiveBayes from pyspark.ml.evaluation import MulticlassClassificationEvaluator #Setting up NaiveBayes model by = NaiveBayes(featuresCol = 'features', labelCol = 'label') #Training NaiveBayes model... byModel = by.fit(train) #...and apply to test set by_predictions = byModel.transform(test) #Showing some predictions for aiveBayes model on test set by_predictions.select('label', 'features', 'rawPrediction', 'prediction', 'probability').show(10) #Importing two evaluator as same as LogisticRegression by_evaluator = BinaryClassificationEvaluator() by_acc_evaluator = MulticlassClassificationEvaluator() #Showing AUC for test set print("Test Area Under ROC: " + str(by_evaluator.evaluate(by_predictions, {by_evaluator.metricName: "areaUnderROC"}))) #Showing accuracy for test set print("Test Accuracy: " + str(by_acc_evaluator.evaluate(by_predictions, {by_acc_evaluator.metricName: "accuracy"})))
就这样吧。。。反正有批注。。
最近太懒了。。唉_(:з」∠)_