-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathPCA_implementation.py
180 lines (129 loc) · 5.19 KB
/
PCA_implementation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.PCA.html
# %% Import required libraries
import os
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, exp
from pyspark.sql.types import StructType
from pyspark.sql import functions as F
from utils.data_struct_init import final_struct, columns_to_analyze, data_schema_otherwise
# 2. Memory configuration
spark = SparkSession.builder.config("spark.sql.shuffle.partitions", "4").config("spark.driver.memory", "2g").getOrCreate()
file_path = r"C:\Users\sachi\pyspark_tutorial\muse_pipeline\Telepathic-Navigation\muse_dataset\Trial_2\Outlier_filtered_dataset.csv"
df = spark.read.csv(file_path, header=True, schema=StructType(data_schema_otherwise))
# Fix: Reduce partitions
df = df.coalesce(1) # or small number like 2-4
# Show the shape of the DataFrame
print(f"Shape: {df.count(), len(df.columns)}")
# %% Now applying PCA on the data
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Convert the data to dense vector
feature_columns = columns_to_analyze[1:-1]
# Assemble the columns to a single column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
# Transform the data
df = assembler.transform(df)
# Joint the labels with the features
df = df.select("features" , "label")
# First split the dataframe
train_df, test_df = df.randomSplit([0.7, 0.3], seed=123)
# Method 1: Keep as separate dataframes
X_train = train_df.select("features")
Y_train = train_df.select("label")
X_test = test_df.select("features")
Y_test = test_df.select("label")
print("Size of the Training Datasets:", "Training", X_train.count(), Y_train.count())
print("Size of the Testing Datasets:", "Training", X_test.count(), Y_test.count())
# %% Apply PCA
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(X_train)
X_train = model.transform(X_train).select("pca_features")
X_test = model.transform(X_test).select("pca_features")
X_train.cache()
X_test.cache()
Y_train.cache()
Y_test.cache()
# %% Now we can use the PCA features to train the Logistic Regression model
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
# First, we need to combine features and labels back into single dataframes
# Create training dataframe with features and label
train_data = X_train.select("pca_features").join(Y_train.select("label"))
test_data = X_test.select("pca_features").join(Y_test.select("label"))
# Initialize Logistic Regression
lr = LogisticRegression(
featuresCol="pca_features", # PCA transformed features
labelCol="label",
maxIter=100, # maximum iterations
regParam=0.1 # regularization parameter
)
# Fit the model
lr_model = lr.fit(train_data)
# Make predictions on test data
predictions = lr_model.transform(test_data)
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")
# Additional evaluation metrics
print("Training summary:")
training_summary = lr_model.summary
print(f"Area under ROC: {training_summary.areaUnderROC}")
print(f"F-measure: {training_summary.fMeasureByLabel()}")
# Print model coefficients
print("\nModel Coefficients:")
for feature, coef in zip(range(len(lr_model.coefficients)), lr_model.coefficients):
print(f"Feature {feature}: {coef}")
# %% Save the model to disk
file_path = r"C:\Users\sachi\pyspark_tutorial\muse_pipeline\Telepathic-Navigation\muse_dataset\lr_model"
# Save the model
lr_model.save(file_path)
# Or with overwrite option
lr_model.write().overwrite().save(file_path)
# %% Load the model from disk when needed
loaded_model = LogisticRegressionModel.load(file_path)
# Use the loaded model
predictions = loaded_model.transform(test_data)
# %% Plot the predictions against the actual labels
import matplotlib.pyplot as plt
import numpy as np
# Convert the predictions and labels to numpy arrays
predictions = np.array(predictions.select("prediction").collect())
labels = np.array(predictions.select("label").collect())
# Plot the predictions against the actual labels
plt.figure(figsize=(10, 6))
plt.plot(predictions, label='Predictions')
plt.plot(labels, label='Actual Labels')
plt.title("Predictions vs Actual Labels")
plt.legend()
plt.show()
# %% Extending the code to classify data using Random Forest
from pyspark.ml.classification import RandomForestClassifier
# Initialize Random Forest
rf = RandomForestClassifier(
featuresCol="pca_features", # PCA transformed features
labelCol="label",
maxDepth=5, # maximum depth of the tree
numTrees=20 # number of trees in the forest
)
# Fit the model
rf_model = rf.fit(train_data)
# Make predictions on test data
predictions = rf_model.transform(test_data)
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")
# %%