-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy path3. Data Reduction.scala
257 lines (149 loc) · 6.17 KB
/
3. Data Reduction.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
// /opt/spark/bin/spark-shell --packages JMailloH:kNN_IS:3.0,djgarcia:SmartReduction:1.0,djgarcia:Equal-Width-Discretizer:1.0 --jars /home/administrador/datasets/mdlp-mrmr.jar
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
sc.setLogLevel("ERROR")
// Load Train & Test
val pathTrain = "file:///home/administrador/datasets/susy-10k-tra.data"
val rawDataTrain = sc.textFile(pathTrain)
val pathTest = "file:///home/administrador/datasets/susy-10k-tst.data"
val rawDataTest = sc.textFile(pathTest)
// Train & Test RDDs
val train = rawDataTrain.map{line =>
val array = line.split(",")
var arrayDouble = array.map(f => f.toDouble)
val featureVector = Vectors.dense(arrayDouble.init)
val label = arrayDouble.last
LabeledPoint(label, featureVector)
}.repartition(16)
val test = rawDataTest.map { line =>
val array = line.split(",")
var arrayDouble = array.map(f => f.toDouble)
val featureVector = Vectors.dense(arrayDouble.init)
val label = arrayDouble.last
LabeledPoint(label, featureVector)
}.repartition(16)
train.persist
test.persist
// Encapsulate Learning Algorithms
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.rdd.RDD
def trainDT(train: RDD[LabeledPoint], test: RDD[LabeledPoint], maxDepth: Int = 5): Double = {
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxBins = 32
val model = DecisionTree.trainClassifier(train, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)
val labelAndPreds = test.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testAcc = 1 - labelAndPreds.filter(r => r._1 != r._2).count().toDouble / test.count()
testAcc
}
import org.apache.spark.mllib.classification.kNN_IS.kNN_IS
import org.apache.spark.mllib.evaluation._
import org.apache.spark.rdd.RDD
def trainKNN(train: RDD[LabeledPoint], test: RDD[LabeledPoint], k: Int = 3): Double = {
val numClass = train.map(_.label).distinct().collect().length
val numFeatures = train.first().features.size
val knn = kNN_IS.setup(train, test, k, 2, numClass, numFeatures, train.getNumPartitions, 2, -1, 1)
val predictions = knn.predict(sc)
val metrics = new MulticlassMetrics(predictions)
val precision = metrics.precision
precision
}
/*****Instance Selection*****/
// FCNN_MR
import org.apache.spark.mllib.feature._
val k = 3 //number of neighbors
val fcnn_mr_model = new FCNN_MR(train, k)
val fcnn_mr = fcnn_mr_model.runPR()
fcnn_mr.persist()
fcnn_mr.count()
trainDT(fcnn_mr, test)
trainKNN(fcnn_mr, test)
// RMHC_MR
import org.apache.spark.mllib.feature._
val p = 0.1 // Percentage of instances (max 1.0)
val it = 100 // Number of iterations
val k = 3 // Number of neighbors
val rmhc_mr_model = new RMHC_MR(train, p, it, k, 48151623)
val rmhc_mr = rmhc_mr_model.runPR()
rmhc_mr.persist()
rmhc_mr.count()
trainDT(rmhc_mr, test)
trainKNN(rmhc_mr, test)
// SSMA-SFLSDE_MR
import org.apache.spark.mllib.feature._
val ssmasflsde_mr_model = new SSMASFLSDE_MR(train)
val ssmasflsde_mr = ssmasflsde_mr_model.runPR()
ssmasflsde_mr.persist()
ssmasflsde_mr.count()
trainDT(ssmasflsde_mr, test)
trainKNN(ssmasflsde_mr, test)
/*****Discretization*****/
// Equal Width Discretizer
import org.apache.spark.mllib.feature._
val nBins = 25 // Number of bins
val discretizerModel = new EqualWidthDiscretizer(train,nBins).calcThresholds()
val discretizedTrain = discretizerModel.discretize(train)
val discretizedTest = discretizerModel.discretize(test)
discretizedTrain.first
discretizedTest.first
trainDT(discretizedTrain, discretizedTest)
trainKNN(discretizedTrain, discretizedTest)
// MDLP
import org.apache.spark.ml.feature.{MDLPDiscretizer, LabeledPoint => NewLabeledPoint}
val mdlpTrain = train.map(l => NewLabeledPoint(l.label, l.features.asML)).toDS()
val mdlpTest = test.map(l => NewLabeledPoint(l.label, l.features.asML)).toDS()
val bins = 25
val discretizer = new MDLPDiscretizer().setMaxBins(bins).setMaxByPart(10000).setInputCol("features").setLabelCol("label").setOutputCol("buckedFeatures")
val model = discretizer.fit(mdlpTrain)
val trainDisc = model.transform(mdlpTrain).rdd.map(row => LabeledPoint(
row.getAs[Double]("label"),
Vectors.dense(row.getAs[org.apache.spark.ml.linalg.Vector]("buckedFeatures").toArray)
))
val testDisc = model.transform(mdlpTest).rdd.map(row => LabeledPoint(
row.getAs[Double]("label"),
Vectors.dense(row.getAs[org.apache.spark.ml.linalg.Vector]("buckedFeatures").toArray)
))
trainDT(trainDisc, testDisc)
trainKNN(trainDisc, testDisc)
/*****Feature Selection*****/
//ChiSq
import org.apache.spark.mllib.feature.ChiSqSelector
val numFeatures = 5
val selector = new ChiSqSelector(numFeatures)
val transformer = selector.fit(train)
val chisqTrain = train.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}
val chisqTest = test.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}
chisqTrain.first.features.size
trainDT(chisqTrain, chisqTest)
trainKNN(chisqTrain, chisqTest)
// PCA
import org.apache.spark.mllib.feature.PCA
val numFeatures = 5
val pca = new PCA(5).fit(train.map(_.features))
val projectedTrain = train.map(p => p.copy(features = pca.transform(p.features)))
val projectedTest = test.map(p => p.copy(features = pca.transform(p.features)))
projectedTrain.first.features.size
projectedTest.first.features.size
trainDT(projectedTrain, projectedTest)
trainKNN(projectedTrain, projectedTest)
// mRMR
import org.apache.spark.mllib.feature._
val criterion = new InfoThCriterionFactory("mrmr")
val nToSelect = 5
val nPartitions = trainDisc.getNumPartitions
val featureSelector = new InfoThSelector(criterion, nToSelect, nPartitions).fit(trainDisc)
val reducedTrain = trainDisc.map(i => LabeledPoint(i.label, featureSelector.transform(i.features)))
reducedTrain.first()
val reducedTest = testDisc.map(i => LabeledPoint(i.label, featureSelector.transform(i.features)))
trainDT(reducedTrain, reducedTest)
trainKNN(reducedTrain, reducedTest)