-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery_2.py
52 lines (40 loc) · 1.47 KB
/
query_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from load_data import *
from pyspark.sql.functions import rank, dense_rank, row_number
#### ==== QUERY 2 ====
# Να βρεθεί, για κάθε μήνα, η διαδρομή με το υψηλότερο ποσό στα διόδια. Αγνοήστε
# μηδενικά ποσά.
# -----------------------------------------------
# === SQL ===
sql_str = \
"WITH ranked_trips AS ( " + \
" SELECT *, " + \
" ROW_NUMBER() " + \
" OVER (PARTITION BY MONTH(tpep_pickup_datetime) ORDER BY tolls_amount DESC) AS rank " + \
" FROM taxi_trips " + \
" WHERE tolls_amount > 0 " + \
") " + \
"SELECT * " + \
"FROM ranked_trips " + \
"WHERE rank = 1 " + \
"ORDER BY MONTH(tpep_pickup_datetime);"
# start_time = time.time()
# res = spark.sql(sql_str)
# res.show()
# print('Total time for SQL: ', time.time() - start_time, 'sec')
# -----------------------------------------------
# === DataFrame ===
total_time = 0
for i in range(n_iter):
start_time = time.time()
window = Window.partitionBy(month(col("tpep_pickup_datetime"))).orderBy(col("tolls_amount").desc())
res = taxi_trips_df.filter(col("tolls_amount") > 0)\
.withColumn("rank", row_number().over(window))\
.filter(col("rank") == 1)\
.orderBy(month(col("tpep_pickup_datetime")).asc())\
.drop("rank")
res.count()
total_time += time.time() - start_time
res.show()
print('Average time for DataFrame: ', str(total_time/n_iter), 'sec')
f.write('Average Time for Q2: ' + str(total_time/n_iter) + '\n')
f.close()