-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbasic_aco.py
132 lines (107 loc) · 4.99 KB
/
basic_aco.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import numpy as np
import random
from vprtw_aco_figure import VrptwAcoFigure
from vrptw_base import VrptwGraph, PathMessage
from ant import Ant
from threading import Thread
from queue import Queue
import time
class BasicACO:
def __init__(self, graph: VrptwGraph, ants_num=10, max_iter=200, beta=2, q0=0.1,
whether_or_not_to_show_figure=True):
super()
self.graph = graph
self.ants_num = ants_num
self.max_iter = max_iter
self.max_load = graph.vehicle_capacity
self.beta = beta
self.q0 = q0
self.best_path_distance = None
self.best_path = None
self.best_vehicle_num = None
self.whether_or_not_to_show_figure = whether_or_not_to_show_figure
def run_basic_aco(self):
path_queue_for_figure = Queue()
basic_aco_thread = Thread(target=self._basic_aco, args=(path_queue_for_figure,))
basic_aco_thread.start()
if self.whether_or_not_to_show_figure:
figure = VrptwAcoFigure(self.graph.nodes, path_queue_for_figure)
figure.run()
basic_aco_thread.join()
if self.whether_or_not_to_show_figure:
path_queue_for_figure.put(PathMessage(None, None))
def _basic_aco(self, path_queue_for_figure: Queue):
"""
:return:
"""
start_time_total = time.time()
start_iteration = 0
for iter in range(self.max_iter):
ants = list(Ant(self.graph) for _ in range(self.ants_num))
for k in range(self.ants_num):
while not ants[k].index_to_visit_empty():
next_index = self.select_next_index(ants[k])
if not ants[k].check_condition(next_index):
next_index = self.select_next_index(ants[k])
if not ants[k].check_condition(next_index):
next_index = 0
ants[k].move_to_next_index(next_index)
self.graph.local_update_pheromone(ants[k].current_index, next_index)
ants[k].move_to_next_index(0)
self.graph.local_update_pheromone(ants[k].current_index, 0)
paths_distance = np.array([ant.total_travel_distance for ant in ants])
best_index = np.argmin(paths_distance)
if self.best_path is None or paths_distance[best_index] < self.best_path_distance:
self.best_path = ants[int(best_index)].travel_path
self.best_path_distance = paths_distance[best_index]
self.best_vehicle_num = self.best_path.count(0) - 1
start_iteration = iter
if self.whether_or_not_to_show_figure:
path_queue_for_figure.put(PathMessage(self.best_path, self.best_path_distance))
print('\n')
print('[iteration %d]: find a improved path, its distance is %f' % (iter, self.best_path_distance))
print('it takes %0.3f second multiple_ant_colony_system running' % (time.time() - start_time_total))
self.graph.global_update_pheromone(self.best_path, self.best_path_distance)
given_iteration = 100
if iter - start_iteration > given_iteration:
print('\n')
print('iteration exit: can not find better solution in %d iteration' % given_iteration)
break
print('\n')
print('final best path distance is %f, number of vehicle is %d' % (self.best_path_distance, self.best_vehicle_num))
print('it takes %0.3f second multiple_ant_colony_system running' % (time.time() - start_time_total))
def select_next_index(self, ant):
"""
:param ant:
:return:
"""
current_index = ant.current_index
index_to_visit = ant.index_to_visit
transition_prob = self.graph.pheromone_mat[current_index][index_to_visit] * \
np.power(self.graph.heuristic_info_mat[current_index][index_to_visit], self.beta)
transition_prob = transition_prob / np.sum(transition_prob)
if np.random.rand() < self.q0:
max_prob_index = np.argmax(transition_prob)
next_index = index_to_visit[max_prob_index]
else:
next_index = BasicACO.stochastic_accept(index_to_visit, transition_prob)
return next_index
@staticmethod
def stochastic_accept(index_to_visit, transition_prob):
"""
轮盘赌
:param index_to_visit: a list of N index (list or tuple)
:param transition_prob:
:return: selected index
"""
# calculate N and max fitness value
N = len(index_to_visit)
# normalize
sum_tran_prob = np.sum(transition_prob)
norm_transition_prob = transition_prob/sum_tran_prob
# select: O(1)
while True:
# randomly select an individual with uniform probability
ind = int(N * random.random())
if random.random() <= norm_transition_prob[ind]:
return index_to_visit[ind]