-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfind.py
78 lines (56 loc) · 2.38 KB
/
find.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import click
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
def find_results(model_id: str, importances: np.array):
model_name = model_id.replace("/", "-")
spark = SparkSession \
.builder \
.appName("ASBA") \
.config("spark.driver.memory", "15g") \
.config("spark.sql.shuffle.partitions", "300") \
.getOrCreate()
businesses = spark.read.json("data/yelp/business.json")
reviews = spark.read.json("data/yelp/review.json")
users = spark.read.json("data/yelp/user.json")
adjusted = reviews \
.join(users, reviews.user_id == users.user_id) \
.withColumn("adjusted_stars", (1 / 2)*(F.col("stars") - F.col("average_stars")) + 3) \
.select("business_id", "stars", "adjusted_stars") \
.groupBy("business_id") \
.mean()
bdf = businesses.alias("a") \
.join(adjusted.alias("b"), businesses.business_id == adjusted.business_id) \
.select("a.business_id", "name", "city", "state", "stars", "review_count", F.col("avg(adjusted_stars)").alias("adjusted_stars", )) \
.toPandas()
bdf = bdf.set_index("business_id")
aspects = spark.read.json(f"data/{model_name}/aspects.json")
aspects = aspects.select("business_id", "user_id", "aspect", "polarity").groupBy(["business_id"]).pivot("aspect").mean()
aspects = aspects.toPandas()
aspects = aspects.set_index("business_id")
aspects = 2*aspects + 3
results = pd.concat([bdf, aspects], join="inner", axis=1)
results = results.rename({"adjusted_stars": "overall"}, axis=1)
R = results.get(["overall", "food", "service", "price", "ambience", "anecdotes"])
results["score"] = R.values@importances
results = results.sort_values("score", ascending=False)
print("\n")
top = results[:5].round(3)
print("Top")
print("===")
print(top.to_markdown())
print("\n")
bottom = results.sort_values(["score", "stars"])[:5].round(3)
print("Bottom")
print("======")
print(bottom.to_markdown())
@click.command()
@click.option("--model-id", type=str)
@click.option("--importances", type=str)
def main(model_id: str, importances: str):
importances = np.array([float(i) for i in importances.split(",")])
importances = importances / importances.sum()
find_results(model_id, importances)
if __name__ == "__main__":
main()