forked from Tribler/dispersy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtaskmanager.py
124 lines (103 loc) · 4.38 KB
/
taskmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from threading import Lock
from .util import blocking_call_on_reactor_thread
from twisted.internet import reactor
from twisted.internet.base import DelayedCall
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
CLEANUP_FREQUENCY = 100
class TaskManager(object):
"""
Provides a set of tools to mantain a list of twisted "tasks" (Deferred, LoopingCall, DelayedCall) that are to be
executed during the lifetime of an arbitrary object, usually getting killed with it.
"""
_reactor = reactor
def __init__(self):
self._pending_tasks = {}
self._cleanup_counter = CLEANUP_FREQUENCY
self._task_lock = Lock()
def replace_task(self, name, task):
"""
Replace named task with the new one, cancelling the old one in the process.
"""
self.cancel_pending_task(name)
return self.register_task(name, task)
def register_task(self, name, task, delay=None, value=None, interval=None):
"""
Register a task so it can be canceled at shutdown time or by name.
"""
assert not self.is_pending_task_active(name), name
assert isinstance(task, (Deferred, DelayedCall, LoopingCall)), (task, type(task) == type(Deferred))
if delay is not None:
if isinstance(task, Deferred):
if value is None:
raise ValueError("Expecting value to fire the Deferred with")
dc = self._reactor.callLater(delay, task.callback, value)
elif isinstance(task, LoopingCall):
if interval is None:
raise ValueError("Expecting interval for delayed LoopingCall")
dc = self._reactor.callLater(delay, task.start, interval)
else:
raise ValueError("Expecting Deferred or LoopingCall if task is delayed")
task = (dc, task)
self._maybe_clean_task_list()
with self._task_lock:
self._pending_tasks[name] = task
return task
@blocking_call_on_reactor_thread
def cancel_pending_task(self, name):
"""
Cancels the named task
"""
self._maybe_clean_task_list()
is_active, stopfn = self._get_isactive_stopper(name)
if is_active and stopfn:
stopfn()
self._pending_tasks.pop(name)
def cancel_all_pending_tasks(self):
"""
Cancels all the registered tasks.
This usually should be called when stopping or destroying the object so no tasks are left floating around.
"""
assert all([isinstance(task, (Deferred, DelayedCall, LoopingCall, tuple))
for task in self._pending_tasks.itervalues()]), self._pending_tasks
for name in self._pending_tasks.keys():
self.cancel_pending_task(name)
def is_pending_task_active(self, name):
"""
Return a boolean determining if a task is active.
"""
return self._get_isactive_stopper(name)[0]
def _get_isactive_stopper(self, name):
"""
Return a boolean determining if a task is active and its cancel/stop method if the task is registered.
"""
task = self._pending_tasks.get(name, None)
def do_get(task):
if isinstance(task, Deferred):
# Have in mind that any deferred in the pending tasks list should have been constructed with a
# canceller function.
return not task.called, getattr(task, 'cancel', None)
elif isinstance(task, DelayedCall):
return task.active(), task.cancel
elif isinstance(task, LoopingCall):
return task.running, task.stop
elif isinstance(task, tuple):
if task[0].active():
return task[0].active(), task[0].cancel
else:
return do_get(task[1])
else:
return False, None
return do_get(task)
def _maybe_clean_task_list(self):
"""
Removes finished tasks from the task list.
"""
if self._cleanup_counter:
self._cleanup_counter -= 1
else:
self._cleaup_counter = CLEANUP_FREQUENCY
for name in self._pending_tasks.keys():
if not self.is_pending_task_active(name):
self._pending_tasks.pop(name)
__all__ = ["TaskManager"]