-
Notifications
You must be signed in to change notification settings - Fork 853
/
Copy pathprocess_control_data.py
210 lines (167 loc) · 7.95 KB
/
process_control_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import datetime
from syscore.exceptions import missingData
from sysobjects.production.process_control import (
dictOfControlProcesses,
controlProcess,
was_running_pid_notok_closed,
processNotRunning,
)
from syscore.constants import named_object, success
from sysdata.base_data import baseData
from syslogging.logger import *
class controlProcessData(baseData):
def __init__(self, log=get_logger("controlProcessData")):
super().__init__(log=log)
self._control_store = dict()
def get_dict_of_control_processes(self) -> dictOfControlProcesses:
list_of_names = self.get_list_of_process_names()
output_dict = dict(
[
(process_name, self.get_control_for_process_name(process_name))
for process_name in list_of_names
]
)
output_dict = dictOfControlProcesses(output_dict)
return output_dict
def get_list_of_process_names(self) -> list:
return list(self._control_store.keys())
def get_control_for_process_name(self, process_name) -> controlProcess:
try:
control = self._get_control_for_process_name_without_default(process_name)
except missingData:
return controlProcess()
else:
return control
def _get_control_for_process_name_without_default(
self, process_name
) -> controlProcess:
try:
control = self._control_store[process_name]
except KeyError:
raise missingData("Process %s not found in control store" % process_name)
return control
def _update_control_for_process_name(self, process_name, new_control_object):
try:
self._get_control_for_process_name_without_default(process_name)
except missingData:
self._add_control_for_process_name(process_name, new_control_object)
else:
self._modify_existing_control_for_process_name(
process_name, new_control_object
)
def _add_control_for_process_name(self, process_name, new_control_object):
self._control_store[process_name] = new_control_object
def _modify_existing_control_for_process_name(
self, process_name, new_control_object
):
self._control_store[process_name] = new_control_object
def delete_control_for_process_name(self, process_name):
raise NotImplementedError
def check_if_okay_to_start_process(self, process_name: str) -> named_object:
"""
:param process_name: str
:return: success, or if not okay: process_no_run, process_stop, process_running
"""
original_process = self.get_control_for_process_name(process_name)
result = original_process.check_if_okay_to_start_process()
return result
def start_process(self, process_name: str) -> named_object:
"""
:param process_name: str
:return: success, or if not okay: process_no_run, process_stop, process_running
"""
original_process = self.get_control_for_process_name(process_name)
result = original_process.start_process()
if result is success:
self._update_control_for_process_name(process_name, original_process)
return result
def check_if_should_pause_process(self, process_name: str) -> bool:
original_process = self.get_control_for_process_name(process_name)
result = original_process.check_if_should_pause()
return result
def check_if_pid_running_and_if_not_finish_all_processes(self):
list_of_names = self.get_list_of_process_names()
list_of_results = [
self.check_if_pid_running_and_if_not_finish(process_name)
for process_name in list_of_names
]
return list_of_results
def check_if_pid_running_and_if_not_finish(self, process_name: str):
original_process = self.get_control_for_process_name(process_name)
PID = original_process.process_id
result = original_process.check_if_pid_running_and_if_not_finish_return_status()
if result is was_running_pid_notok_closed:
self.log.critical(
"Process %s with PID %d appears to have crashed, marking as close: you may want to restart"
% (process_name, PID)
)
self._update_control_for_process_name(process_name, original_process)
def finish_all_processes(self):
list_of_names = self.get_list_of_process_names()
for process_name in list_of_names:
try:
self.finish_process(process_name)
except processNotRunning:
pass
return success
def finish_process(self, process_name: str):
"""
:param process_name: str
:return: sucess or failure if can't finish process (maybe already running?)
"""
original_process = self.get_control_for_process_name(process_name)
original_process.finish_process()
self._update_control_for_process_name(process_name, original_process)
def check_if_process_status_stopped(self, process_name: str) -> bool:
"""
:param process_name: str
:return: bool
"""
original_process = self.get_control_for_process_name(process_name)
result = original_process.check_if_process_status_stopped()
return result
def has_process_finished_in_last_day(self, process_name: str) -> bool:
"""
:param process_name: str
:return: bool
"""
original_process = self.get_control_for_process_name(process_name)
result = original_process.has_process_finished_in_last_day()
return result
def change_status_to_stop(self, process_name: str):
original_process = self.get_control_for_process_name(process_name)
original_process.change_status_to_stop()
self._update_control_for_process_name(process_name, original_process)
def change_status_to_go(self, process_name: str):
original_process = self.get_control_for_process_name(process_name)
original_process.change_status_to_go()
self._update_control_for_process_name(process_name, original_process)
def change_status_to_no_run(self, process_name: str):
original_process = self.get_control_for_process_name(process_name)
original_process.change_status_to_no_run()
self._update_control_for_process_name(process_name, original_process)
def change_status_to_pause(self, process_name: str):
original_process = self.get_control_for_process_name(process_name)
original_process.change_status_to_pause()
self._update_control_for_process_name(process_name, original_process)
def log_start_run_for_method(self, process_name: str, method_name: str):
original_process = self.get_control_for_process_name(process_name)
original_process.log_start_run_for_method(method_name)
self._update_control_for_process_name(process_name, original_process)
def log_end_run_for_method(self, process_name: str, method_name: str):
original_process = self.get_control_for_process_name(process_name)
original_process.log_end_run_for_method(method_name)
self._update_control_for_process_name(process_name, original_process)
def when_method_last_started(
self, process_name: str, method_name: str
) -> datetime.datetime:
original_process = self.get_control_for_process_name(process_name)
return original_process.when_method_last_started(method_name)
def when_method_last_ended(
self, process_name: str, method_name: str
) -> datetime.datetime:
original_process = self.get_control_for_process_name(process_name)
return original_process.when_method_last_ended(method_name)
def method_currently_running(self, process_name: str, method_name: str) -> bool:
original_process = self.get_control_for_process_name(process_name)
return original_process.method_currently_running(method_name)