SIT742 关于作业A2

嗨呀A1好气啊.jpg

白给白给,这次A2随便写算了,反正怎么搞都是70多。。。就很难受_(:з」∠)_

可能以为分数问题,所以A2加了很多批注,就不往文章里塞别的注释了。。嗯

!pip install wget
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession 
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
import wget
link_to_data = 'https://github.com/tulip-lab/sit742/raw/master/Assessment/2019/data/bank.csv'
DataSet = wget.download(link_to_data)

!ls

# Import the 'bank.csv' as a Spark dataframe and name it as df
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('bank.csv', header = True, inferSchema = True) 

# Check data distribution
# You may use printSchema()

#Use printSchema() to show the content of dataset
df.printSchema()

#Showing first 5 rows of data
df.show(5)

#Using describe() to show distribution of numerical data.
df.describe(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']).show()
#Select features ('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit') as df2
#Selecting required attributes as df2 
df2=df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit')

#Showing contents of df2
df2.printSchema()

#Showing first 5 rows of df2
df2.show(5)


#Remove invalid rows/records using spark.sql 
#Register df2 as table "bank"
df2.registerTempTable("bank")

#Using spark.sql filter out rows with attributes with "unknown"
#Not all attribute are having "unknown" value, so not all attributes put into this section.
#Hence, poutcome can only be "success" or "failure", so row with "other" in this attribute also will be filter out. 
df3=spark.sql("SELECT * FROM bank WHERE job <> 'unknown' AND education <> 'unknown' AND poutcome <> 'unknown' AND poutcome <> 'other'")

#Showing contents of filtered dataframe
df3.printSchema() 

#..and first 5 rows
df3.show(5)


#Covert categorical features to metric features using One hot encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler

#Transforme categorical data into indexes for further transformation by StringIndexer.
#This step is to selecting categorical attributes for transforme into indexes
indexers=[StringIndexer(inputCol=column, outputCol=column+"_index").fit(df3) for column in list(set(df3.columns)-set(['age', 'balance', 'campaign', 'pdays', 'previous','deposit']))]

#Transforme multiple categorical attributres by with the help of Pipeline.
pipeline=Pipeline(stages=indexers)
df4=pipeline.fit(df3).transform(df3)

#Defining what columns should use One-Hot Encoding, and also the output columns.
encoder=OneHotEncoderEstimator(inputCols=["housing_index", "marital_index", "default_index", "loan_index", "job_index", "education_index", "poutcome_index"],
                              outputCols=["housing_vec", "marital_vec", "default_vec", "loan_vec", "job_vec", "education_vec", "poutcome_vec"])

#Step of One-Hot Encoding transformation.
model_ohe=encoder.fit(df4)
df5=model_ohe.transform(df4)

#Delect the original and index data for categorical attribues, only left the transformed result and numerical data.
df6=df5.select([column for column in df5.columns if column not in ["housing", "marital", "default", "loan", "job", "education", "poutcome", 
                                                                  "housing_index", "marital_index", "default_index", "loan_index", "job_index", "education_index", "poutcome_index"]])

#Assemble all features into one attribute, for prepare of normalization.
assembler=VectorAssembler(inputCols=["age", "balance", "campaign", "pdays", "previous", "job_vec", "education_vec", "marital_vec", "poutcome_vec", "housing_vec", "loan_vec", "default_vec"], outputCol="features")
df7=assembler.transform(df6)

#Only left the feature attribute.
df8=df7.select("features")

#...and also showing the content
df8.printSchema()

#Bibliography
#https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml
#https://stackoverflow.com/questions/36942233/apply-stringindexer-to-several-columns-in-a-pyspark-dataframe
#https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator


from pyspark.ml.feature import StringIndexer
#Because the attribute "deposit" is also the label, so we select "deposit" to a single dataframe as y
y_df1=df3.select("deposit")

#...also transforme label into index type by using StringIndexer
indexer=StringIndexer(inputCol="deposit", outputCol="deposit_index")
y_df2=indexer.fit(y_df1).transform(y_df1)

#...and store as y
y=y_df2.select('deposit_index')

y.printSchema()
#Apply Min-Max normalisation on each attribute using MinMaxScaler  

from pyspark.ml.feature import MinMaxScaler

#Using MinMaxScaler for column "features" normalnization
scaler=MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel=scaler.fit(df8)
scaled=scalerModel.transform(df8)

#Print the scaled range
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

#...and also store as dataframe X
X=scaled.select("scaledFeatures")
X.show(truncate=False)

#Bibliography
#https://spark.apache.org/docs/2.2.0/ml-features.html#minmaxscaler


from pyspark.sql.functions import monotonically_increasing_id 

#Generating "id" for both X and y dataframe for merging
X_index = X.select("*").withColumn("id", monotonically_increasing_id())
y_index = y.select("*").withColumn("id", monotonically_increasing_id())

#Merging X and y into one dataframe
data_model = y_index.join(X_index, "id", "outer").drop("id") 

#Transform into dataframe, store as df100
df100=data_model.toDF('label', 'features')
df100.show()
# Perform unsupervised learning on df2 with k-means 
# You can use whole df2 as both training and testing data, 
# Evaluate the clustering result using Accuracy.  

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator 
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import DoubleType

#Setting up k-means cluster parameter
#Because we need to "estimate" the label of data, so we only performe 2 cluster for getting close to the binomial label...
k_means=KMeans().setK(2).setSeed(1)

#Applying k-means cluster
cluster=k_means.fit(df100)
cluster_predictions=cluster.transform(df100)

#First evalustor for cluster: ClusteringEvaluator, by using silhouette criteria
evaluator=ClusteringEvaluator()
silhouette = evaluator.evaluate(cluster_predictions)
print("Silhouette with squared distnce is :" + str(silhouette))

#The second evalustor: MulticlassClassificationEvaluator, by using accuracy criteria
kmeans_acc_evaluator = MulticlassClassificationEvaluator()

#But first we need to transfrome datatype for k-means model's prediction: integer to binomial
cluster_predictions.printSchema()
cluster_predictions_1 = cluster_predictions.withColumn("prediction", cluster_predictions["prediction"].cast("double"))

#Then apply accuracy criteria for cluster model
print("Test Accuracy: " + str(kmeans_acc_evaluator.evaluate(cluster_predictions_1, {kmeans_acc_evaluator.metricName: "accuracy"})))

#Showing the centroids of cluster.
centers = cluster.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

#...and showing some prediction result for k-means cluster model
cluster_predictions_1.show()

#Bibliography
#https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark
#https://spark.apache.org/docs/2.4.2/ml-clustering.html
#https://jakevdp.github.io/PythonDataScienceHandbook/05.11-k-means.html
#Generate a scatter plot using the first two PCA components to investigate the data distribution.
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.ml.linalg import Vectors
import matplotlib.pyplot as plt
import numpy as np

#Setting up PCA parameters, only 2 compinents for plotting
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")

#Applying PCA for dataset (df100)
model_pca = pca.fit(X)
result_pca = model_pca.transform(X)

#Showing some result for PCA
result_pca.show(truncate=False)

#We need to transforme PCA components into numpy array format for plotting...
#Transforme to numpy array also benefits for splitting two PCA components.
np_pca_1 = np.array(result_pca.select('pcaFeatures').collect())

#Some format setting for numpy array for plotting...removing 1 level for array
np_pca = np_pca_1[ :, 0, :]

#Scatter plot settings, including size of graph, title, x/y labels, and dot size
plt.figure(dpi=120)
plt.suptitle('PCA features Scatter plot', fontsize=15)
plt.xlabel('components0', fontsize=10)
plt.ylabel('components1', fontsize=10)
plt.scatter(np_pca[:,0], np_pca[:,1], s=5, alpha=1)

#Bibliography
#https://stackoverflow.com/questions/12444716/how-do-i-set-the-figure-title-and-axes-labels-font-size-in-matplotlib
#https://stackoverflow.com/questions/34007632/how-to-remove-a-column-in-a-numpy-array/34008274
#https://stackoverflow.com/questions/42116143/extracting-numpy-array-from-pyspark-dataframe
#Splitting dataframe into traning set and test set
train, test = df100.randomSplit([0.7, 0.3], seed = 742)

#...and print how many rows in both traning and test set. 
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

#Bibliography
#https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa
##This Bibliography can be use for almost of the Supervised learning part.


# Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#Setting up LogisticRegression parametrs
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)

#Training LogisticRegression model
lrModel = lr.fit(train)

#...and apply to test set
lr_predictions = lrModel.transform(test)

#Showing some prediction result for LogisticRegression
lr_predictions.select('label', 'features', 'rawPrediction', 'prediction', 'probability').show(10)

#Selecitng BinaryClassificationEvaluator and MulticlassClassificationEvaluator as the ROC/accuracy evaluator
lr_evaluator = BinaryClassificationEvaluator()
lr_acc_evaluator = MulticlassClassificationEvaluator()

#Generating traning summary for LogisticRegression
lr_trainingSummary = lrModel.summary

#Transforming summary's ROC part to dataframe
lr_roc = lr_trainingSummary.roc.toPandas()

#...and start plotting
plt.figure(dpi=120)
plt.title('ROC Curve', fontsize=15)
plt.xlabel('True Positive Rate', fontsize=10)
plt.ylabel('False Positive Rate', fontsize=10)
plt.plot(lr_roc['FPR'],lr_roc['TPR'])
plt.show()

#Showing AUC for training set
print('Training set areaUnderROC: ' + str(lr_trainingSummary.areaUnderROC))

#Showing AUC for test set
print("Test Area Under ROC: " + str(lr_evaluator.evaluate(lr_predictions, {lr_evaluator.metricName: "areaUnderROC"})))

#Showing accuracy for Logistic Regression predicting test set.
print("Test Accuracy: " + str(lr_acc_evaluator.evaluate(lr_predictions, {lr_acc_evaluator.metricName: "accuracy"})))

#Exam the coefficients
import matplotlib.pyplot as plt
import numpy as np

#Storing coefficients into numpy array
beta = np.sort(lrModel.coefficients)

#...and plotting
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()
#Decision tree
from pyspark.ml.classification import DecisionTreeClassifier 

#Setting up Decision Tree Classifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 5)

#Training Decision Tree model...
dtModel = dt.fit(train)

#...and apply to test set
dt_predictions = dtModel.transform(test)

#Showing some predictions for Decision Tree model on test set
dt_predictions.select('label', 'features', 'rawPrediction', 'prediction', 'probability').show(10)

#Importing two evaluator as same as LogisticRegression
dt_evaluator = BinaryClassificationEvaluator()
dt_acc_evaluator = MulticlassClassificationEvaluator()

#Showing AUC for test set
print("Test Area Under ROC: " + str(dt_evaluator.evaluate(dt_predictions, {dt_evaluator.metricName: "areaUnderROC"})))

#Showing accuracy for test set
print("Test Accuracy: " + str(dt_acc_evaluator.evaluate(dt_predictions, {dt_acc_evaluator.metricName: "accuracy"})))
#NaiveBayes
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Setting up NaiveBayes model
by = NaiveBayes(featuresCol = 'features', labelCol = 'label')

#Training NaiveBayes model...
byModel = by.fit(train)

#...and apply to test set
by_predictions = byModel.transform(test)

#Showing some predictions for aiveBayes model on test set
by_predictions.select('label', 'features', 'rawPrediction', 'prediction', 'probability').show(10)

#Importing two evaluator as same as LogisticRegression
by_evaluator = BinaryClassificationEvaluator()
by_acc_evaluator = MulticlassClassificationEvaluator()

#Showing AUC for test set
print("Test Area Under ROC: " + str(by_evaluator.evaluate(by_predictions, {by_evaluator.metricName: "areaUnderROC"})))

#Showing accuracy for test set
print("Test Accuracy: " + str(by_acc_evaluator.evaluate(by_predictions, {by_acc_evaluator.metricName: "accuracy"})))

就这样吧。。。反正有批注。。

最近太懒了。。唉_(:з」∠)_

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注