-
Notifications
You must be signed in to change notification settings - Fork 43
/
taskmanager.py
141 lines (118 loc) · 5.1 KB
/
taskmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from threading import Lock
from twisted.internet import reactor
from twisted.internet.base import DelayedCall
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.task import LoopingCall
from twisted.python.threadable import isInIOThread
from .util import blocking_call_on_reactor_thread
CLEANUP_FREQUENCY = 100
class TaskManager(object):
"""
Provides a set of tools to mantain a list of twisted "tasks" (Deferred, LoopingCall, DelayedCall) that are to be
executed during the lifetime of an arbitrary object, usually getting killed with it.
"""
_reactor = reactor
def __init__(self):
self._pending_tasks = {}
self._cleanup_counter = CLEANUP_FREQUENCY
self._task_lock = Lock()
def replace_task(self, name, task):
"""
Replace named task with the new one, cancelling the old one in the process.
"""
self.cancel_pending_task(name)
return self.register_task(name, task)
def register_task(self, name, task, delay=None, value=None, interval=None):
"""
Register a task so it can be canceled at shutdown time or by name.
"""
# TODO(Martijn): this assert is necessary, however, we are calling register_task many times from a non-io
# thread in Tribler. So for now, I will comment it out until we fixed the calls to this method.
#assert isInIOThread()
assert not self.is_pending_task_active(name), name
assert isinstance(task, (Deferred, DelayedCall, LoopingCall)), (task, type(task) == type(Deferred))
if delay is not None:
if isinstance(task, Deferred):
if value is None:
raise ValueError("Expecting value to fire the Deferred with")
dc = self._reactor.callLater(delay, task.callback, value)
elif isinstance(task, LoopingCall):
if interval is None:
raise ValueError("Expecting interval for delayed LoopingCall")
dc = self._reactor.callLater(delay, task.start, interval)
else:
raise ValueError("Expecting Deferred or LoopingCall if task is delayed")
task = (dc, task)
self._maybe_clean_task_list()
with self._task_lock:
self._pending_tasks[name] = task
return task
@blocking_call_on_reactor_thread
def cancel_pending_task(self, name):
"""
Cancels the named task
"""
self._maybe_clean_task_list()
is_active, stopfn = self._get_isactive_stopper(name)
if is_active and stopfn:
stopfn()
self._pending_tasks.pop(name)
def cancel_all_pending_tasks(self):
"""
Cancels all the registered tasks.
This usually should be called when stopping or destroying the object so no tasks are left floating around.
"""
assert all([isinstance(task, (Deferred, DelayedCall, LoopingCall, tuple))
for task in self._pending_tasks.itervalues()]), self._pending_tasks
for name in self._pending_tasks.keys():
self.cancel_pending_task(name)
def is_pending_task_active(self, name):
"""
Return a boolean determining if a task is active.
"""
return self._get_isactive_stopper(name)[0]
def wait_for_deferred_tasks(self):
"""
Returns a deferred that will fire when all registered Deferreds are done.
"""
assert isInIOThread()
self._maybe_clean_task_list()
return DeferredList(self._iter_deferreds())
def _iter_deferreds(self):
for task in self._pending_tasks.itervalues():
if isinstance(task, Deferred):
yield task
def _get_isactive_stopper(self, name):
"""
Return a boolean determining if a task is active and its cancel/stop method if the task is registered.
"""
task = self._pending_tasks.get(name, None)
def do_get(task):
if isinstance(task, Deferred):
# Have in mind that any deferred in the pending tasks list should have been constructed with a
# canceller function.
return not task.called, getattr(task, 'cancel', None)
elif isinstance(task, DelayedCall):
return task.active(), task.cancel
elif isinstance(task, LoopingCall):
return task.running, task.stop
elif isinstance(task, tuple):
if task[0].active():
return task[0].active(), task[0].cancel
else:
return do_get(task[1])
else:
return False, None
return do_get(task)
def _maybe_clean_task_list(self):
"""
Removes finished tasks from the task list.
"""
if self._cleanup_counter:
self._cleanup_counter -= 1
else:
self._cleaup_counter = CLEANUP_FREQUENCY
for name in self._pending_tasks.keys():
if not self.is_pending_task_active(name):
self._pending_tasks.pop(name)
__all__ = ["TaskManager"]