forked from Tribler/dispersy
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcallback.py
848 lines (714 loc) · 34.9 KB
/
callback.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
# Python 2.5 features
from __future__ import with_statement
"""
A callback thread running Dispersy.
"""
from heapq import heappush, heappop
from thread import get_ident
from threading import Thread, Lock, Event, currentThread
from time import sleep, time
from types import GeneratorType, TupleType
from sys import exc_info
try:
import prctl
except ImportError:
prctl = None
from .decorator import attach_profiler
from .dprint import dprint
from .revision import update_revision_information
if __debug__:
from atexit import register as atexit_register
from inspect import getsourcefile, getsourcelines
from types import LambdaType
# dprint warning when registered call, or generator call, takes more than N seconds
CALL_DELAY_FOR_WARNING = 0.5
# dprint warning when registered call, or generator call, should have run N seconds ago
QUEUE_DELAY_FOR_WARNING = 1.0
# update version information directly from SVN
update_revision_information("$HeadURL$", "$Revision$")
class Callback(object):
if __debug__:
@staticmethod
def _debug_call_to_string(call):
# 10/02/12 Boudewijn: in python 2.5 generators do not have .__name__
if isinstance(call, TupleType):
if isinstance(call[0], LambdaType):
return "lambda@%s:%d" % (getsourcefile(call[0])[-25:], getsourcelines(call[0])[1])
else:
return call[0].__name__
elif isinstance(call, GeneratorType):
return call.__name__
else:
return str(call)
def __init__(self):
# _event is used to wakeup the thread when new actions arrive
self._event = Event()
self._event_set = self._event.set
self._event_is_set = self._event.isSet
# _lock is used to protect variables that are written to on multiple threads
self._lock = Lock()
# _thread_ident is used to detect when methods are called from the same thread
self._thread_ident = 0
# _state contains the current state of the thread. it is protected by _lock and follows the
# following states:
#
# --> fatal-exception -> STATE_EXCEPTION
# /
# STATE_INIT -> start() -> PLEASE_RUN -> STATE_RUNNING
# \ \
# --------------> stop() -> PLEASE_STOP -> STATE_FINISHED
#
self._state = "STATE_INIT"
if __debug__: dprint("STATE_INIT")
# _exception is set to SystemExit, KeyboardInterrupt, GeneratorExit, or AssertionError when
# any of the registered callbacks raises any of these exceptions. in this case _state will
# be set to STATE_EXCEPTION. it is protected by _lock
self._exception = None
self._exception_traceback = None
# _exception_handlers contains a list with callable functions of methods. all handlers are
# called whenever an exception occurs. first parameter is the exception, second parameter
# is a boolean indicating if the exception is fatal (i.e. True indicates SystemExit,
# KeyboardInterrupt, GeneratorExit, or AssertionError)
self._exception_handlers = []
# _id contains a running counter to ensure that every scheduled callback has its own unique
# identifier. it is protected by _lock
self._id = 0
# _requests are ordered by deadline and moved to -expired- when they need to be handled
# (deadline, priority, root_id, (call, args, kargs), callback)
self._requests = []
# expired requests are ordered and handled by priority
# (priority, root_id, None, (call, args, kargs), callback)
self._expired = []
# _requests_mirror and _expired_mirror contains the same list as _requests and _expired,
# respectively. when the callback closes _requests is set to a new empty list while
# _requests_mirror continues to point to the existing one. because all task 'deletes' are
# done on the _requests_mirror list, these actions will still be allowed while no new tasks
# will be accepted.
self._requests_mirror = self._requests
self._expired_mirror = self._expired
if __debug__:
def must_close(callback):
assert callback.is_finished
atexit_register(must_close, self)
self._debug_thread_name = ""
self._debug_call_name = None
@property
def ident(self):
return self._thread_ident
@property
def is_current_thread(self):
"""
Returns True when called on this Callback thread.
"""
return self._thread_ident == get_ident()
@property
def is_running(self):
"""
Returns True when the state is STATE_RUNNING.
"""
return self._state == "STATE_RUNNING"
@property
def is_finished(self):
"""
Returns True when the state is either STATE_FINISHED, STATE_EXCEPTION or STATE_INIT. In either case the
thread is no longer running.
"""
return self._state == "STATE_FINISHED" or self._state == "STATE_EXCEPTION" or self._state == "STATE_INIT"
@property
def exception(self):
"""
Returns the exception that caused the thread to exit when when any of the registered callbacks
raises either SystemExit, KeyboardInterrupt, GeneratorExit, or AssertionError.
"""
return self._exception
@property
def exception_traceback(self):
"""
Returns the traceback of the exception that caused the thread to exit when when any of the registered callbacks
"""
return self._exception_traceback
def attach_exception_handler(self, func):
"""
Attach a new exception notifier.
FUNC will be called whenever a registered call raises an exception. The first parameter
will be the raised exception, the second parameter will be a boolean indicating if the
exception was fatal.
Fatal exceptions are SystemExit, KeyboardInterrupt, GeneratorExit, or AssertionError. These
exceptions will cause the Callback thread to exit. The Callback thread will continue to
function on all other exceptions.
"""
assert callable(func), "handler must be callable"
with self._lock:
assert not func in self._exception_handlers, "handler was already attached"
self._exception_handlers.append(func)
def detach_exception_handler(self, func):
"""
Detach an existing exception notifier.
"""
assert callable(func), "handler must be callable"
with self._lock:
assert func in self._exception_handlers, "handler is not attached"
self._exception_handlers.remove(func)
def _call_exception_handlers(self, exception, fatal):
with self._lock:
exception_handlers = self._exception_handlers[:]
for exception_handler in exception_handlers:
try:
exception_handler(exception, fatal)
except Exception:
dprint(exception=True, level="error")
assert False, "the exception handler should not cause an exception"
def register(self, call, args=(), kargs=None, delay=0.0, priority=0, id_="", callback=None, callback_args=(), callback_kargs=None, include_id=False):
"""
Register CALL to be called.
The call will be made with ARGS and KARGS as arguments and keyword arguments, respectively.
ARGS must be a tuple and KARGS must be a dictionary.
CALL may return a generator object that will be repeatedly called until it raises the
StopIteration exception. The generator can yield floating point values to reschedule the
generator after that amount of seconds counted from the scheduled start of the call. It is
possible to yield other values, however, these are currently undocumented.
The call will be made after DELAY seconds. DELAY must be a floating point value.
When multiple calls should be, or should have been made, the PRIORITY will decide the order
at which the calls are made. Calls with a higher PRIORITY will be handled before calls with
a lower PRIORITY. PRIORITY must be an integer. The default PRIORITY is 0. The order will
be undefined for calls with the same PRIORITY.
Each call is identified with an ID_. A unique numerical identifier will be assigned when no
ID_ is specified. And specified id's must be (unicode)strings. Registering multiple calls
with the same ID_ is allowed, all calls will be handled normally, however, all these calls
will be removed if the associated ID_ is unregistered.
Once the call is performed the optional CALLBACK is registered to be called immediately.
The first parameter of the CALLBACK will always be either the returned value or the raised
exception. If CALLBACK_ARGS is given it will be appended to the first argument. If
CALLBACK_KARGS is given it is added to the callback as keyword arguments.
When INCLUDE_ID is True then ID_ or the generated identifier is given as the first argument
to CALL.
Returns ID_ if specified or a uniquely generated numerical identifier
Example:
> callback.register(my_func, delay=10.0)
> -> my_func() will be called after 10.0 seconds
Example:
> def my_generator():
> while True:
> print "foo"
> yield 1.0
> callback.register(my_generator)
> -> my_generator will be called immediately printing "foo", subsequently "foo" will be
printed at 1.0 second intervals
"""
assert callable(call), "CALL must be callable"
assert isinstance(args, tuple), "ARGS has invalid type: %s" % type(args)
assert kargs is None or isinstance(kargs, dict), "KARGS has invalid type: %s" % type(kargs)
assert isinstance(delay, float), "DELAY has invalid type: %s" % type(delay)
assert isinstance(priority, int), "PRIORITY has invalid type: %s" % type(priority)
assert isinstance(id_, basestring), "ID_ has invalid type: %s" % type(id_)
assert callback is None or callable(callback), "CALLBACK must be None or callable"
assert isinstance(callback_args, tuple), "CALLBACK_ARGS has invalid type: %s" % type(callback_args)
assert callback_kargs is None or isinstance(callback_kargs, dict), "CALLBACK_KARGS has invalid type: %s" % type(callback_kargs)
assert isinstance(include_id, bool), "INCLUDE_ID has invalid type: %d" % type(include_id)
if __debug__: dprint("register ", call, " after ", delay, " seconds")
with self._lock:
if not id_:
self._id += 1
id_ = self._id
if delay <= 0.0:
heappush(self._expired,
(-priority,
time(),
id_,
None,
(call, args + (id_,) if include_id else args, {} if kargs is None else kargs),
None if callback is None else (callback, callback_args, {} if callback_kargs is None else callback_kargs)))
else:
heappush(self._requests,
(delay + time(),
-priority,
id_,
(call, args + (id_,) if include_id else args, {} if kargs is None else kargs),
None if callback is None else (callback, callback_args, {} if callback_kargs is None else callback_kargs)))
# wakeup if sleeping
if not self._event_is_set():
self._event_set()
return id_
def persistent_register(self, id_, call, args=(), kargs=None, delay=0.0, priority=0, callback=None, callback_args=(), callback_kargs=None, include_id=False):
"""
Register CALL to be called only if ID_ has not already been registered.
Aside from the different behavior of ID_, all parameters behave as in register(...).
Example:
> callback.persistent_register("my-id", my_func, ("first",), delay=60.0)
> callback.persistent_register("my-id", my_func, ("second",))
> -> my_func("first") will be called after 60 seconds, my_func("second") will not be called at all
Example:
> callback.register(my_func, ("first",), delay=60.0, id_="my-id")
> callback.persistent_register("my-id", my_func, ("second",))
> -> my_func("first") will be called after 60 seconds, my_func("second") will not be called at all
"""
assert isinstance(id_, basestring), "ID_ has invalid type: %s" % type(id_)
assert id_, "ID_ may not be an empty (unicode)string"
assert callable(call), "CALL must be callable"
assert isinstance(args, tuple), "ARGS has invalid type: %s" % type(args)
assert kargs is None or isinstance(kargs, dict), "KARGS has invalid type: %s" % type(kargs)
assert isinstance(delay, float), "DELAY has invalid type: %s" % type(delay)
assert isinstance(priority, int), "PRIORITY has invalid type: %s" % type(priority)
assert callback is None or callable(callback), "CALLBACK must be None or callable"
assert isinstance(callback_args, tuple), "CALLBACK_ARGS has invalid type: %s" % type(callback_args)
assert callback_kargs is None or isinstance(callback_kargs, dict), "CALLBACK_KARGS has invalid type: %s" % type(callback_kargs)
assert isinstance(include_id, bool), "INCLUDE_ID has invalid type: %d" % type(include_id)
if __debug__: dprint("persistent register ", call, " after ", delay, " seconds")
with self._lock:
for tup in self._requests:
if tup[2] == id_:
break
else:
# not found in requests
for tup in self._expired:
if tup[2] == id_:
break
else:
# not found in expired
if delay <= 0.0:
heappush(self._expired,
(-priority,
time(),
id_,
None,
(call, args + (id_,) if include_id else args, {} if kargs is None else kargs),
None if callback is None else (callback, callback_args, {} if callback_kargs is None else callback_kargs)))
else:
heappush(self._requests,
(delay + time(),
-priority,
id_,
(call, args + (id_,) if include_id else args, {} if kargs is None else kargs),
None if callback is None else (callback, callback_args, {} if callback_kargs is None else callback_kargs)))
# wakeup if sleeping
if not self._event_is_set():
self._event_set()
return id_
def replace_register(self, id_, call, args=(), kargs=None, delay=0.0, priority=0, callback=None, callback_args=(), callback_kargs=None, include_id=False):
"""
Replace (if present) the currently registered call ID_ with CALL.
This is a faster way to handle an unregister and register call. All parameters behave as in
register(...).
"""
assert isinstance(id_, (basestring, int)), "ID_ has invalid type: %s" % type(id_)
assert id_, "ID_ may not be zero or an empty (unicode)string"
assert callable(call), "CALL must be callable"
assert isinstance(args, tuple), "ARGS has invalid type: %s" % type(args)
assert kargs is None or isinstance(kargs, dict), "KARGS has invalid type: %s" % type(kargs)
assert isinstance(delay, float), "DELAY has invalid type: %s" % type(delay)
assert isinstance(priority, int), "PRIORITY has invalid type: %s" % type(priority)
assert callback is None or callable(callback), "CALLBACK must be None or callable"
assert isinstance(callback_args, tuple), "CALLBACK_ARGS has invalid type: %s" % type(callback_args)
assert callback_kargs is None or isinstance(callback_kargs, dict), "CALLBACK_KARGS has invalid type: %s" % type(callback_kargs)
assert isinstance(include_id, bool), "INCLUDE_ID has invalid type: %d" % type(include_id)
if __debug__: dprint("replace register ", call, " after ", delay, " seconds")
with self._lock:
# un-register
for index, tup in enumerate(self._requests_mirror):
if tup[2] == id_:
self._requests_mirror[index] = (tup[0], tup[1], id_, None, None)
if __debug__: dprint("in _requests: ", id_)
for index, tup in enumerate(self._expired_mirror):
if tup[2] == id_:
self._expired_mirror[index] = (tup[0], tup[1], id_, tup[3], None, None)
if __debug__: dprint("in _expired: ", id_)
# register
if delay <= 0.0:
heappush(self._expired,
(-priority,
time(),
id_,
None,
(call, args + (id_,) if include_id else args, {} if kargs is None else kargs),
None if callback is None else (callback, callback_args, {} if callback_kargs is None else callback_kargs)))
else:
heappush(self._requests,
(delay + time(),
-priority,
id_,
(call, args + (id_,) if include_id else args, {} if kargs is None else kargs),
None if callback is None else (callback, callback_args, {} if callback_kargs is None else callback_kargs)))
# wakeup if sleeping
if not self._event_is_set():
self._event_set()
return id_
def unregister(self, id_):
"""
Unregister a callback using the ID_ obtained from the register(...) method
"""
assert isinstance(id_, (basestring, int)), "ROOT_ID has invalid type: %s" % type(id_)
assert id_, "ID_ may not be zero or an empty (unicode)string"
if __debug__: dprint(id_)
with self._lock:
# un-register
for index, tup in enumerate(self._requests_mirror):
if tup[2] == id_:
self._requests_mirror[index] = (tup[0], tup[1], id_, None, None)
if __debug__: dprint("in _requests: ", id_)
for index, tup in enumerate(self._expired_mirror):
if tup[2] == id_:
self._expired_mirror[index] = (tup[0], tup[1], id_, tup[2], None, None)
if __debug__: dprint("in _expired: ", id_)
def call(self, call, args=(), kargs=None, delay=0.0, priority=0, id_="", include_id=False, timeout=0.0, default=None):
"""
Register a blocking CALL to be made, waits for the call to finish, and returns or raises the
result.
TIMEOUT gives the maximum amount of time to wait before un-registering CALL. No timeout
will occur when TIMEOUT is 0.0. When a timeout occurs the DEFAULT value is returned.
TIMEOUT is unused when called from the same thread.
DEFAULT can be anything. The DEFAULT value is returned when a TIMEOUT occurs. When DEFAULT
is an Exception instance it will be raised instead of returned.
For the arguments CALL, ARGS, KARGS, DELAY, PRIORITY, ID_, and INCLUDE_ID: see the register(...) method.
"""
assert isinstance(timeout, float)
assert 0.0 <= timeout
assert self._thread_ident
def callback(result):
container[0] = result
event.set()
if self._thread_ident == get_ident():
if kargs:
return call(*args, **kargs)
else:
return call(*args)
else:
# result container
container = [default]
event = Event()
# register the call
self.register(call, args, kargs, delay, priority, id_, callback, (), None, include_id)
# wait for call to finish
event.wait(None if timeout == 0.0 else timeout)
if isinstance(container[0], Exception):
raise container[0]
else:
return container[0]
def start(self, name="Generic-Callback", wait=True):
"""
Start the asynchronous thread.
Creates a new thread and calls the _loop() method.
"""
assert self._state == "STATE_INIT", "Already (done) running"
assert isinstance(name, str)
assert isinstance(wait, bool), "WAIT has invalid type: %s" % type(wait)
if __debug__: dprint()
with self._lock:
self._state = "STATE_PLEASE_RUN"
if __debug__:
dprint("STATE_PLEASE_RUN")
self._debug_thread_name = name
thread = Thread(target=self._loop, name=name)
thread.daemon = True
thread.start()
if wait:
# Wait until the thread has started
while self._state == "STATE_PLEASE_RUN":
sleep(0.01)
return self.is_running
def stop(self, timeout=10.0, wait=True, exception=None):
"""
Stop the asynchronous thread.
When called with wait=True on the same thread we will return immediately.
"""
assert isinstance(timeout, float)
assert isinstance(wait, bool)
if __debug__: dprint()
if self._state == "STATE_RUNNING":
with self._lock:
if exception:
self._exception = exception
self._exception_traceback = exc_info()[2]
self._state = "STATE_PLEASE_STOP"
if __debug__: dprint("STATE_PLEASE_STOP")
# wakeup if sleeping
self._event.set()
if wait and not self._thread_ident == get_ident():
while self._state == "STATE_PLEASE_STOP" and timeout > 0.0:
sleep(0.01)
timeout -= 0.01
if __debug__:
if timeout <= 0.0:
dprint("timeout. perhaps callback.stop() was called on the same thread?")
return self.is_finished
def loop(self):
"""
Use the calling thread for this Callback instance.
"""
if __debug__: dprint()
with self._lock:
self._state = "STATE_PLEASE_RUN"
if __debug__: dprint("STATE_PLEASE_RUN")
self._loop()
@attach_profiler
def _loop(self):
if __debug__:
dprint()
time_since_expired = 0
if prctl:
prctl.set_name("Tribler" + currentThread().getName())
# put some often used methods and object in the local namespace
actual_time = 0
event_clear = self._event.clear
event_wait = self._event.wait
event_is_set = self._event.isSet
expired = self._expired
get_timestamp = time
lock = self._lock
requests = self._requests
self._thread_ident = get_ident()
with lock:
if self._state == "STATE_PLEASE_RUN":
self._state = "STATE_RUNNING"
if __debug__: dprint("STATE_RUNNING")
while 1:
actual_time = get_timestamp()
with lock:
# check if we should continue to run
if self._state != "STATE_RUNNING":
break
# move expired requests from REQUESTS to EXPIRED
while requests and requests[0][0] <= actual_time:
# notice that the deadline and priority entries are switched, hence, the entries in
# the EXPIRED list are ordered by priority instead of deadline
deadline, priority, root_id, call, callback = heappop(requests)
heappush(expired, (priority, deadline, root_id, None, call, callback))
if expired:
if __debug__ and len(expired) > 10:
if not time_since_expired:
time_since_expired = actual_time
# we need to handle the next call in line
priority, deadline, root_id, _, call, callback = heappop(expired)
wait = 0.0
if __debug__:
self._debug_call_name = self._debug_call_to_string(call)
# ignore removed tasks
if call is None:
continue
else:
# there is nothing to handle
wait = requests[0][0] - actual_time if requests else 300.0
if __debug__:
dprint("nothing to handle, wait ", wait, " seconds")
if time_since_expired:
diff = actual_time - time_since_expired
if diff > 1.0:
dprint("took ", round(diff, 2), " to process expired queue", level="warning")
time_since_expired = 0
if event_is_set():
event_clear()
if wait:
if __debug__: dprint("%d wait at most %.3fs before next call, still have %d calls in queue" % (time(), wait, len(requests)))
event_wait(wait)
else:
if __debug__:
dprint(self._debug_thread_name, "] calling ", self._debug_call_name, " (prio:", priority, ", id:", root_id, ")")
debug_call_start = time()
# call can be either:
# 1. a generator
# 2. a (callable, args, kargs) tuple
try:
if isinstance(call, TupleType):
# callback
result = call[0](*call[1], **call[2])
if isinstance(result, GeneratorType):
# we only received the generator, no actual call has been made to the
# function yet, therefore we call it again immediately
call = result
elif callback:
with lock:
heappush(expired, (priority, actual_time, root_id, None, (callback[0], (result,) + callback[1], callback[2]), None))
if isinstance(call, GeneratorType):
# start next generator iteration
result = call.next()
assert isinstance(result, float), [type(result), call]
assert result >= 0.0, [result, call]
with lock:
heappush(requests, (get_timestamp() + result, priority, root_id, call, callback))
except StopIteration:
if callback:
with lock:
heappush(expired, (priority, actual_time, root_id, None, (callback[0], (result,) + callback[1], callback[2]), None))
except (SystemExit, KeyboardInterrupt, GeneratorExit, AssertionError), exception:
dprint("attempting proper shutdown", exception=True, level="error")
with lock:
self._state = "STATE_EXCEPTION"
self._exception = exception
self._exception_traceback = exc_info()[2]
self._call_exception_handlers(exception, True)
except Exception, exception:
if callback:
with lock:
heappush(expired, (priority, actual_time, root_id, None, (callback[0], (exception,) + callback[1], callback[2]), None))
if __debug__:
dprint("__debug__ only shutdown", exception=True, level="error")
with lock:
self._state = "STATE_EXCEPTION"
self._exception = exception
self._exception_traceback = exc_info()[2]
self._call_exception_handlers(exception, True)
else:
dprint(exception=True, level="error")
self._call_exception_handlers(exception, False)
if __debug__:
debug_call_duration = time() - debug_call_start
if debug_call_duration > 1.0:
dprint(round(debug_call_duration, 2), "s call to ", self._debug_call_name, level="warning")
with lock:
# allowing us to refuse any new tasks. _requests_mirror and _expired_mirror will still
# allow tasks to be removed
self._requests = []
self._expired = []
# call all expired tasks and send GeneratorExit exceptions to expired generators, note that
# new tasks will not be accepted
if __debug__: dprint(self._debug_thread_name, "] there are ", len(expired), " expired tasks")
while expired:
_, _, _, _, call, callback = heappop(expired)
if isinstance(call, TupleType):
try:
result = call[0](*call[1], **call[2])
except:
dprint(exception=True, level="error")
else:
if isinstance(result, GeneratorType):
# we only received the generator, no actual call has been made to the
# function yet, therefore we call it again immediately
call = result
elif callback:
try:
callback[0](result, *callback[1], **callback[2])
except:
dprint(exception=True, level="error")
if isinstance(call, GeneratorType):
if __debug__: dprint("raise Shutdown in ", call)
try:
call.close()
except:
dprint(exception=True, level="error")
# send GeneratorExit exceptions to scheduled generators
if __debug__: dprint("there are ", len(requests), " scheduled tasks")
while requests:
_, _, _, call, _ = heappop(requests)
if isinstance(call, GeneratorType):
if __debug__: dprint("raise Shutdown in ", call)
try:
call.close()
except:
dprint(exception=True, level="error")
# set state to finished
with lock:
if __debug__: dprint("STATE_FINISHED")
self._state = "STATE_FINISHED"
if __debug__:
def main():
c = Callback()
c.start()
d = Callback()
d.start()
def call1():
dprint(time())
sleep(2)
dprint(time())
c.register(call1, delay=1.0)
sleep(2)
dprint(line=1)
def call2():
delay = 3.0
for i in range(10):
dprint(time(), " ", i)
sleep(delay)
if delay > 0.0:
delay -= 1.0
yield 1.0
c.register(call2)
sleep(11)
dprint(line=1)
def call3():
delay = 3.0
for i in range(10):
dprint(time(), " ", i)
yield Switch(d)
# perform code on Callback d
sleep(delay)
if delay > 0.0:
delay -= 1.0
yield Switch(c)
# perform code on Callback c
c.register(call3)
sleep(11.0)
dprint(line=1)
# CPU intensive call... should 'back off'
def call4():
for _ in xrange(10):
sleep(2.0)
desync = (yield 1.0)
dprint("desync... ", desync)
while desync > 0.1:
dprint("backing off... ", desync)
desync = (yield desync)
dprint("next try... ", desync)
dprint(line=1)
def call5_bussy():
for _ in xrange(10):
desync = yield 0.0
dprint("on bussy (", desync, ")")
sleep(0.4)
def call5_idle():
for _ in xrange(10):
desync = yield Idle()
dprint("on idle (", desync, ")")
c.register(call5_bussy)
c.register(call5_idle)
dprint(line=1)
def call6():
dprint("before")
yield Idle(5.0)
dprint("after")
c.register(call6)
def call7():
dprint("init")
while True:
yield 1.0
dprint("-")
c.unregister(task_id)
task_id = c.register(call7)
c.unregister(task_id)
sleep(21.0)
dprint(line=1)
def call8(index):
container1[index] += 1
def call9(index):
container2[index] += 1
def call10():
indexes = range(len(container1))
random.shuffle(indexes)
for index in indexes:
c.register(call8, (index,))
for index in indexes:
c.register(call8, (index,), id_="debug-test-%s" % index)
for index in xrange(len(container1)):
c.unregister("debug-test-%s" % index)
for index in indexes:
c.register(call8, (index,), delay=60.0, id_="debug-test-2-%s" % index)
for index in xrange(len(container1)):
c.unregister("debug-test-2-%s" % index)
for index in indexes:
c.register(call8, (index,), id_="debug-test-3-%s" % index)
for index in xrange(len(container1)):
c.replace_register("debug-test-3-%s" % index, call9, (index,))
for index in indexes:
c.register(call8, (index,), delay=60.0, id_="debug-test-4-%s" % index)
for index in xrange(len(container1)):
c.replace_register("debug-test-4-%s" % index, call9, (index,))
for index in indexes:
c.register(call8, (index,), delay=1.0)
import random
container1 = [0] * 1000
container2 = [0] * len(container1)
c.register(call10)
sleep(10.0)
assert all(value == 2 for value in container1), container1
assert all(value == 2 for value in container2), container2
d.stop()
c.stop()
if __name__ == "__main__":
main()