-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogger_flow.py
52 lines (43 loc) · 1.29 KB
/
logger_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import time
from metaflow import FlowSpec, step, Parameter, retry, current
from metrics_logger import metrics_logger, MetricsLogger
class LoggerFlow(FlowSpec):
@step
def start(self):
self.data = [1, 2, 3, 4, 5]
self.next(self.train, foreach="data")
@metrics_logger
@step
def train(self):
setattr(self, "data_" + str(self.input), self.input)
logger = MetricsLogger(
{
"flow_name": current.flow_name,
"run_id": current.run_id,
"step_name": current.step_name,
"task_id": current.task_id,
}
)
for i in range(20 * self.input):
logger.log_step(i, "accuracy", i * 0.01)
logger.log_step(i, "loss", 1 - i * 0.01)
logger.log_step(i, "learning_rate", 0.01)
logger.save()
time.sleep(0.25)
self.next(self.join)
@step
def join(self, inputs):
self.merge_artifacts(inputs)
self.next(self.end)
@step
def end(self):
print(
"values of data_1, data_2, data_3, data_4, data_5",
self.data_1,
self.data_2,
self.data_3,
self.data_4,
self.data_5,
)
if __name__ == "__main__":
LoggerFlow()