-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.py
222 lines (170 loc) · 6.83 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import zmq
import can
import serial
import json
import logging
from typing import *
from roboclaw import Roboclaw
from math import copysign
from time import perf_counter
from threading import Lock
logging.basicConfig(format="[{asctime:<8}] {levelname:<10} {message}", style='{', datefmt='%H:%M:%S')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def clamp(mn, mx, n):
return min(max(n, mn), mx)
# Checks if a specified delta was exceeded when calling `notify`. Thread-safe.
class Watchdog:
def __init__(self, expires: float):
# The last time the watchdog was notified.
self.prev_check = perf_counter()
self.expires = expires
self.lock = Lock()
# Update the watchdog to keep it alive.
def notify(self):
with self.lock:
self.prev_check = perf_counter()
# Check if the watchdog is alive.
@property
def alive(self) -> bool:
with self.lock:
return perf_counter() - self.prev_check < self.expires
class Spinner:
def __init__(self, uart_address: str = "/dev/ttyS1", uart_baud: int = 38400):
self.roboclaw = Roboclaw(serial.Serial(uart_address, uart_baud))
def spin(self, vel: float):
self.roboclaw.forward_backward_m1(min(64 + int(64 * vel), 127))
# Utility class to calculate wheel outputs for differential driving.
class DifferentialDrive:
# Shamelessly ripped from WPILib's differential drive
@staticmethod
def differential_ik(x_vel: float, z_rot: float) -> Tuple[float, float]:
x_vel = clamp(-1.0, 1.0, x_vel)
z_rot = clamp(-1.0, 1.0, z_rot)
# Square the inputs to make it less sensitive at low speed
x_vel = copysign(x_vel * x_vel, x_vel)
z_rot = copysign(z_rot * z_rot, z_rot)
speed_l = x_vel - z_rot
speed_r = x_vel + z_rot
greater = max(abs(x_vel), abs(z_rot))
lesser = min(abs(x_vel), abs(z_rot))
if greater == 0.0:
return (0.0, 0.0)
else:
saturated = (greater + lesser) / greater
speed_l /= saturated
speed_r /= saturated
return (speed_l, speed_r)
def __init__(self):
self.prev_time = perf_counter()
self.prev_wheels = (0.0, 0.0)
self.ramp = 1.0
def drive(self, x_vel: float, z_rot: float) -> (float, float):
delta = perf_counter() - self.prev_time # seconds
target = DifferentialDrive.differential_ik(x_vel, z_rot)
target_diff = (
min(target[0] - self.prev_wheels[0], delta * self.ramp),
min(target[1] - self.prev_wheels[1], delta * self.ramp),
)
out = (
clamp(-1.0, 1.0, self.prev_wheels[0] + target_diff[0]),
clamp(-1.0, 1.0, self.prev_wheels[1] + target_diff[1]),
)
self.prev_wheels = out
return out
class Drivetrain(DifferentialDrive):
def __init__(
self,
controller_id_l: int,
controller_id_r: int,
can_channel: str = "can0",
can_bitrate: int = 125_000
):
super().__init__()
self.controller_id_l = controller_id_l
self.controller_id_r = controller_id_r
self.can = can.Bus(bustype="socketcan", channel=can_channel, bitrate=can_bitrate)
# Track the last time a CAN command was sent to avoid saturating the CAN bus queue.
self.can_command_prev = perf_counter()
self.can_bitrate = 125_000
# Move the drivetrain. Skips sending CAN commands when called faster than 125kbits/s.
def drive(self, x_vel: float, z_rot: float) -> (float, float):
curr_time = perf_counter()
can_command_delta = curr_time - self.can_command_prev
if can_command_delta > 5.0:
logger.info("Sending CAN message.")
(l_duty, r_duty) = super().drive(x_vel, z_rot)
self.can.send(
can.Message(
arbitration_id=self.controller_id_l,
data=int(l_duty * 100_000).to_bytes(4, byteorder="big", signed=True),
is_extended_id=True
)
)
self.can.send(
can.Message(
arbitration_id=self.controller_id_r,
data=(-int(r_duty * 100_000)).to_bytes(4, byteorder="big", signed=True),
is_extended_id=True
)
)
self.can_command_prev = curr_time
return (l_duty, r_duty)
else:
return self.prev_wheels
class Controller:
HEART_ATTACK_THRESHOLD = 1.0
def __init__(self):
# Movement stuff
self.drivetrain = Drivetrain(0x00000000, 0x00000001)
self.spinner = Spinner()
# Networking
self.context = zmq.Context()
self.receiver = self.context.socket(zmq.PULL)
self.receiver.bind(f"tcp://*:5555")
logger.info("Socket bound.")
self.watchdog = Watchdog(Controller.HEART_ATTACK_THRESHOLD)
self.curr_command = None # dictionary
# Get the current controller command. If a new command is available, the current command is
# updated.
def command(self) -> Optional[dict]:
packet = None
# Consume all received packets until we get the latest one.
while True:
try:
packet = self.receiver.recv_string(flags=zmq.NOBLOCK)
self.watchdog.notify() # Make sure we notify the watchdog to keep it alive.
packet = packet.replace("\\", "").strip('"')
packet = json.loads(packet)
except zmq.Again as e:
break
if packet is not None:
self.curr_command = {k: v for (k, v) in dict(packet).items()}
logger.info(f"Received: \n{self.curr_command}")
return self.curr_command
# Run the main controller loop.
def update(self):
command_prev = None
command = self.command()
if self.watchdog.alive:
if command is not None:
right_stick = command["right_stick_y"]
left_stick = command["left_stick_y"]
right_trigger = command["right_trigger"]
left_trigger = command["left_trigger"]
#inverted = 1.0 if not command["invert_button"] else -1.0
self.drivetrain.drive(left_stick, right_stick)
self.spinner.spin(left_trigger - right_trigger)
command_prev = command
else:
logger.info("Controller not connected.")
self.drivetrain.drive(0.0, 0.0)
self.spinner.spin(0.0)
if command_prev != command:
# Reset the watchdog because the connection is back!
self.watchdog = Watchdog(Controller.HEART_ATTACK_THRESHOLD)
if __name__ == "__main__":
controller = Controller()
logger.info("Controller configured.")
while True:
controller.update()