forked from aralab-unr/GA-mammograms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalgorithm.py
243 lines (185 loc) · 7.31 KB
/
algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import threading
import time
from abc import ABC, abstractmethod
from typing import Generic, List, TypeVar
import numpy as np
from jmetal.config import store
from jmetal.core.problem import Problem
from jmetal.core.solution import FloatSolution
# from jmetal.logger import get_logger
#
# logger = get_logger(__name__)
S = TypeVar("S")
R = TypeVar("R")
"""
.. module:: algorithm
:platform: Unix, Windows
:synopsis: Templates for algorithms.
.. moduleauthor:: Antonio J. Nebro <[email protected]>, Antonio Benítez-Hidalgo <[email protected]>
"""
class Algorithm(Generic[S, R], threading.Thread, ABC):
def __init__(self):
threading.Thread.__init__(self)
self.solutions: List[S] = []
self.evaluations = 0
self.start_computing_time = 0
self.total_computing_time = 0
self.observable = store.default_observable
@abstractmethod
def create_initial_solutions(self) -> List[S]:
"""Creates the initial list of solutions of a metaheuristic."""
pass
@abstractmethod
def evaluate(self, solution_list: List[S]) -> List[S]:
"""Evaluates a solution list."""
pass
@abstractmethod
def init_progress(self) -> None:
"""Initialize the algorithm."""
pass
@abstractmethod
def stopping_condition_is_met(self) -> bool:
"""The stopping condition is met or not."""
pass
@abstractmethod
def step(self) -> None:
"""Performs one iteration/step of the algorithm's loop."""
pass
@abstractmethod
def update_progress(self) -> None:
"""Update the progress after each iteration."""
pass
@abstractmethod
def get_observable_data(self) -> dict:
"""Get observable data, with the information that will be send to all observers each time."""
pass
def run(self):
"""Execute the algorithm."""
self.start_computing_time = time.time()
# logger.debug("Creating initial set of solutions...")
self.solutions = self.create_initial_solutions()
# logger.debug("Evaluating solutions...")
self.solutions = self.evaluate(self.solutions)
# logger.debug("Initializing progress...")
self.init_progress()
# logger.debug("Running main loop until termination criteria is met")
while not self.stopping_condition_is_met():
self.step()
self.update_progress()
# logger.debug("Finished!")
self.total_computing_time = time.time() - self.start_computing_time
@abstractmethod
def get_result(self) -> R:
pass
@abstractmethod
def get_name(self) -> str:
pass
class DynamicAlgorithm(Algorithm[S, R], ABC):
@abstractmethod
def restart(self) -> None:
pass
class EvolutionaryAlgorithm(Algorithm[S, R], ABC):
def __init__(self, problem: Problem[S], population_size: int, offspring_population_size: int):
super(EvolutionaryAlgorithm, self).__init__()
self.problem = problem
self.population_size = population_size
self.offspring_population_size = offspring_population_size
@abstractmethod
def selection(self, population: List[S]) -> List[S]:
"""Select the best-fit individuals for reproduction (parents)."""
pass
@abstractmethod
def reproduction(self, population: List[S]) -> List[S]:
"""Breed new individuals through crossover and mutation operations to give birth to offspring."""
pass
@abstractmethod
def replacement(self, population: List[S], offspring_population: List[S]) -> List[S]:
"""Replace least-fit population with new individuals."""
pass
def get_observable_data(self) -> dict:
return {
"PROBLEM": self.problem,
"EVALUATIONS": self.evaluations,
"SOLUTIONS": self.get_result(),
"COMPUTING_TIME": time.time() - self.start_computing_time,
}
def init_progress(self) -> None:
self.evaluations = self.population_size
observable_data = self.get_observable_data()
self.observable.notify_all(**observable_data)
def step(self):
solution_array = []
for solution in self.solutions:
solution_array.append(-1.0 * solution.objectives[0])
solution_array = np.array(solution_array)
with open('generation_stats.txt', 'a') as output:
# save as bestAUC, avgAUC
output.write(str(np.max(solution_array)) + ',' + str(np.average(solution_array)) + "\n")
mating_population = self.selection(self.solutions)
offspring_population = self.reproduction(mating_population)
offspring_population = self.evaluate(offspring_population)
self.solutions = self.replacement(self.solutions, offspring_population)
def update_progress(self) -> None:
self.evaluations += self.offspring_population_size
observable_data = self.get_observable_data()
self.observable.notify_all(**observable_data)
@property
def label(self) -> str:
return f"{self.get_name()}.{self.problem.get_name()}"
class ParticleSwarmOptimization(Algorithm[FloatSolution, List[FloatSolution]], ABC):
def __init__(self, problem: Problem[S], swarm_size: int):
super(ParticleSwarmOptimization, self).__init__()
self.problem = problem
self.swarm_size = swarm_size
@abstractmethod
def initialize_velocity(self, swarm: List[FloatSolution]) -> None:
pass
@abstractmethod
def initialize_particle_best(self, swarm: List[FloatSolution]) -> None:
pass
@abstractmethod
def initialize_global_best(self, swarm: List[FloatSolution]) -> None:
pass
@abstractmethod
def update_velocity(self, swarm: List[FloatSolution]) -> None:
pass
@abstractmethod
def update_particle_best(self, swarm: List[FloatSolution]) -> None:
pass
@abstractmethod
def update_global_best(self, swarm: List[FloatSolution]) -> None:
pass
@abstractmethod
def update_position(self, swarm: List[FloatSolution]) -> None:
pass
@abstractmethod
def perturbation(self, swarm: List[FloatSolution]) -> None:
pass
def get_observable_data(self) -> dict:
return {
"PROBLEM": self.problem,
"EVALUATIONS": self.evaluations,
"SOLUTIONS": self.get_result(),
"COMPUTING_TIME": time.time() - self.start_computing_time,
}
def init_progress(self) -> None:
self.evaluations = self.swarm_size
self.initialize_velocity(self.solutions)
self.initialize_particle_best(self.solutions)
self.initialize_global_best(self.solutions)
observable_data = self.get_observable_data()
self.observable.notify_all(**observable_data)
def step(self):
self.update_velocity(self.solutions)
self.update_position(self.solutions)
self.perturbation(self.solutions)
self.solutions = self.evaluate(self.solutions)
self.update_global_best(self.solutions)
self.update_particle_best(self.solutions)
def update_progress(self) -> None:
self.evaluations += self.swarm_size
observable_data = self.get_observable_data()
self.observable.notify_all(**observable_data)
@property
def label(self) -> str:
return f"{self.get_name()}.{self.problem.get_name()}"