-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconvert_routes.py
94 lines (74 loc) · 2.55 KB
/
convert_routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import json
import argparse
import pandas as pd
import geopandas as gpd
import shapely.geometry as geo
### Command line
parser = argparse.ArgumentParser(description = "LEAD Converter from JSprit to MATSim")
parser.add_argument("--scenario-path", type = str, required = True)
parser.add_argument("--solution-path", type = str, required = True)
parser.add_argument("--output-path", type = str, required = True)
parser.add_argument("--crs", type = str, required = True)
parser.add_argument("--start-time", type = float, required = False, default = 8.0 * 3600.0)
arguments = parser.parse_args()
### Read data
with open(arguments.solution_path) as f:
solution = json.load(f)
with open(arguments.scenario_path) as f:
scenario = json.load(f)
### Convert data
vehicles = []
speeds = {}
start_time = arguments.start_time
for route in solution["routes"]:
vehicle_type = None
stops = []
# Figure out vehicle type and speed
for vt in scenario["vehicle_types"]:
if vt["id"] == route["vehicle_type"]:
vehicle_type = vt
break
if vehicle_type is None:
raise RuntimeError("Vehicle type {} does not exist in scenario".format(route["vehicle_type"]))
speeds[route["vehicle_type"]] = vehicle_type["speed_km_h"]
# Convert coordinates
df = gpd.GeoDataFrame(pd.DataFrame({
"geometry": [
geo.Point(stop["lng"], stop["lat"])
for stop in route["trajectory"]
]
}), crs = "EPSG:4326").to_crs(arguments.crs)
# Convert stops
for stop, geometry in zip(route["trajectory"], df["geometry"]):
stop_type = None
if stop["type"] in ("start", "end"):
stop_type = stop["type"]
elif stop["type"] == "pickupShipment":
stop_type = "pickup"
elif stop["type"] == "deliverShipment":
stop_type = "delivery"
else:
raise RuntimeError("Unknown shop type: {}".format(stop["type"]))
stops.append({
"type": stop_type,
"arrival_time": stop["t0"] + start_time,
"departure_time": stop["t1"] + start_time,
"location": {
"x": geometry.x, "y": geometry.y
}
})
vehicles.append({
"vehicle_type": vehicle_type["id"],
"stops": stops
})
### Prepare and write output
vehicle_types = [
{ "id": id, "speed_km_h": speed }
for id, speed in speeds.items()
]
output = {
"vehicle_types": vehicle_types,
"vehicles": vehicles
}
with open(arguments.output_path, "w+") as f:
json.dump(output, f, indent = 2)