-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_driver.py
111 lines (88 loc) · 3.08 KB
/
db_driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import random
from abc import ABC, abstractmethod
import time
"""
@Author: Paula Munoz - University of Malaga
"""
class DatabaseDriver(ABC):
"""
MANAGEMENT METHODS
"""
@abstractmethod
def close(self):
pass
@abstractmethod
def reset_database(self):
pass
@abstractmethod
def initialize_database(self, filename):
pass
@abstractmethod
def import_data_from_csv(self, filename):
pass
"""
QUERIES
"""
@abstractmethod
def car_follow_the_line(self, parameters: dict):
pass
@abstractmethod
def cars_in_squared_area(self, parameters: dict):
pass
@abstractmethod
def cars_collided(self, parameters: dict):
pass
@abstractmethod
def arm_servo_avg_angle(self, parameters: dict):
pass
@abstractmethod
def arm_servo_aggregated_information(self, parameters: dict):
pass
def execute_queries(self):
time_result = {}
queries = {"follow the line": self.car_follow_the_line,
"avg angle servo": self.arm_servo_avg_angle,
"squared area": self.cars_in_squared_area,
# "collisions": self.cars_collided,
"avg time window servo": self.arm_servo_aggregated_information}
parameters = self._get_prepared_parameters()
for keys in queries.keys():
start_time = time.time()
queries[keys](parameters)
time_result[keys] = time.time() - start_time
return time_result
@staticmethod
def _get_rand_parameters():
cars = ["NXJCar", "NXJCarPT", "XXXCar", "XXXCarPT", "SSSCar", "SSSCarPT"]
braccios = ["braccio", "braccio2"]
timestamp_car = 16274847350
timestamp_braccio = 0
random.seed(333)
parameters = {"twin_id": random.choice(cars),
"ts1_car": timestamp_car + random.randint(0, 20),
"ts1_braccio": timestamp_braccio + random.randint(0, 10),
"sq_xa": random.uniform(0, 6),
"sq_ya": random.uniform(0, 6),
"sq_length": random.uniform(0, 6),
"servo": random.randint(1, 7),
"time_window": random.randint(2, 10),
"collision_distance": random.uniform(0, 2)
}
parameters["ts2_car"] = parameters["ts1_car"] + 10 * random.randint(2, 20)
parameters["ts2_braccio"] = parameters["ts1_braccio"] + random.randint(2, 20)
return parameters
@staticmethod
def _get_prepared_parameters():
parameters = {"twin_id": "NXJCar",
"ts1_car": 16274847350,
"ts2_car": 16274847380,
"ts1_braccio": 0,
"ts2_braccio": 20,
"sq_xa": 3,
"sq_ya": 0,
"sq_length": 3,
"servo": 3,
"time_window": 10,
"collision_distance": 3
}
return parameters