-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark-als.py
134 lines (121 loc) · 5.16 KB
/
spark-als.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from datetime import datetime
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkContext
import numpy as np
from als import ALSSparse
import time, json
from dataset import MovieLensDataset
from utils import load_matrix, save_matrix, sparse_matrix_to_csv
from argparse import ArgumentParser
from main import evaluate
# spark = SparkSession.builder.appName("ALS").getOrCreate()
sc = SparkContext(appName="RankOneALS")
# sc = SparkContext.getOrCreate()
MAX_ITER = 100
SEED = 10
if __name__ == "__main__":
args = ArgumentParser()
args.add_argument('-d',
'--dataset-path',
help='Absolute path of the csv dataset to load',
required=False)
args.add_argument('-e',
'--n-experiments',
help='Number of experiments to run',
type=int,
default=1,
required=False)
args.add_argument('-i',
'--n-iterations',
help='Number of iterations to run ALS algorithm for',
type=int,
default=1000)
args.add_argument(
'-nn',
'--non-negative',
help=
'Setting this will solve least-squares with nonnegativity constraints',
action='store_true',
default=False)
args.add_argument(
'-w',
'--n-workers',
help='Number of workers used to split dataset into test-train',
type=int,
default=8)
args = args.parse_args()
try:
print("Loading train and test split from /tmp/..")
trainX = load_matrix(f'trainX_sparse', True)
testX = load_matrix(f'testX_sparse', True)
except:
print("Loading failed, generating train-test split now..")
dataset = MovieLensDataset(args.dataset_path, mode='sparse')
# %5 test size
test_set_size = dataset.n_ratings // 20
trainX, testX = dataset.train_test_split_simple(test_set_size)
print(f"Saving train and test set to /tmp/ first..")
save_matrix(f'trainX_sparse', trainX)
save_matrix(f'testX_sparse', testX)
# train matrix to csv
print("Storing train/test matrix to csv..")
sparse_matrix_to_csv('/tmp/trainX.csv', trainX)
# testX = testX.toarray()
print("Reading train matrix from csv..")
data = sc.textFile('/tmp/trainX.csv')
runtimes = []
mses = []
for exp_no in range(args.n_experiments):
ratings = data.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# Build the recommendation model using Alternating Least Squares
rank = 1
# do this otherwise ALS crashes <.<
sc.setCheckpointDir('/tmp/')
print("Running ALS..")
start = time.time()
model = ALS.train(ratings,
rank,
args.n_iterations,
lambda_=0.0,
nonnegative=args.non_negative,
seed=SEED)
runtimes.append(time.time()-start)
print(f"Runtime: {runtimes[-1]}s")
# get u, v user and items vectors
pf = model.productFeatures()
v = np.matrix(np.asarray(pf.values().collect()).astype(np.float64))
# print('V:', v.shape, np.linalg.norm(v), v.max(), v.min())
pf = model.userFeatures()
u = np.matrix(np.asarray(pf.values().collect()).astype(np.float64))
# print('U:', u.shape, np.linalg.norm(u), u.max(), u.min())
# keep evaluation schema fixed wrt other experiments
mse = evaluate(u, v, testX, 'sparse')
mses.append(mse)
print("Mean MSE:", sum(mses)/args.n_experiments)
print("Mean Runtime:", sum(runtimes)/args.n_experiments)
with open(f'./data/pyspark_als_{args.n_experiments}.json', 'w') as f:
json.dump({'mse': mses, 'runtime': runtimes}, f)
# testX = testX.toarray()
# sparse_matrix_to_csv('/tmp/testX.csv', testX)
# data = sc.textFile('/tmp/testX.csv')
# ratings = data.map(lambda l: l.split(','))\
# .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# testdata = ratings.map(lambda p: (p[0], p[1]))
# predictions = model.predictAll(testdata).map(lambda r:
# ((r[0], r[1]), r[2]))
# ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(
# predictions)
# MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
# print("Mean Squared Error = " + str(MSE))
# compare with what i did so far
# dataset = MovieLensDataset('/tmp/', n_users=args.n_users, n_movies=args.n_movies, mode='sparse')
# from main import init_vector
# u = init_vector(dataset.n_users, normalize=True)
# v = init_vector(dataset.n_movies, normalize=True)
# trainX, testX = dataset.train_test_split_simple(1000//20)
# args = [u, v, trainX]
# als = ALSSparse(*args)
# u, v = als.fit()
# # dataset = MovieLensDataset('/tmp/testX.csv', n_users=args.n_users, n_movies=args.n_movies, mode='sparse')
# mse = evaluate(u, v, trainX, 'sparse')