forked from danicampora/Micropython-scheduler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpause.py
49 lines (40 loc) · 1.43 KB
/
pause.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# roundrobin.py Runs three threads in round robin fashion. Stops after a duration via a timeout thread.
# Author: Peter Hinch
# Copyright Peter Hinch 2016 Released under the MIT license
import pyb
from usched import Sched
status = ['terminated', 'running', 'paused']
# Run on MicroPython board bare hardware
# THREADS:
def stop(fTim, objSch): # Stop the scheduler after fTim seconds
yield fTim
objSch.stop()
def robin(text):
while True:
pyb.delay(100)
print(text)
yield
def pauser(objSched, thread):
for _ in range(3):
yield 1
objSched.pause(thread)
print('Thread 2 paused: status {}'.format(status[objSched.status(thread)]))
yield 1
objSched.resume(thread)
print('thread 2 resumed: status {}'.format(status[objSched.status(thread)]))
yield 1
objSched.stop(thread)
print('thread 2 killed: status {}'.format(status[objSched.status(thread)]))
yield
print('thread 2 no longer on threadlist: status {}'.format(status[objSched.status(thread)]))
# USER TEST PROGRAM
def test(duration = 0):
objSched = Sched()
objSched.add_thread(robin("Thread 1"))
th2 = objSched.add_thread(robin("Thread 2"))
objSched.add_thread(robin("Thread 3"))
objSched.add_thread(pauser(objSched, th2))
if duration:
objSched.add_thread(stop(duration, objSched)) # Kill after a period
objSched.run()
test(10)