-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathcan_command_server.py
executable file
·147 lines (128 loc) · 5.49 KB
/
can_command_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python3
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import asyncio
import struct
import time
from threading import Thread
import can
class CanCommandServer:
command_configs = [
{
"canRequestId": 0x00000123, # standard 11-bit CAN ID
"canResponseId": 0x00000456, # standard 11-bit CAN ID
"actuatorName": "Vehicle.actuator6",
"argumentType": ">i", # big-endian int32
"inProgressCount": 0,
"status": 1, # SUCCEEDED
"reasonCode": 0x1234,
"reasonDescription": "hello",
"responseDelay": 1,
},
{
"canRequestId": 0x80000789, # extended 29-bit CAN ID (MSB sets extended)
"canResponseId": 0x80000ABC, # extended 29-bit CAN ID (MSB sets extended)
"actuatorName": "Vehicle.actuator7",
"argumentType": ">d", # big-endian double
"inProgressCount": 10,
"status": 1, # SUCCEEDED
"reasonCode": 0x5678,
"reasonDescription": "goodbye",
"responseDelay": 1,
},
]
args = {} # used to record the passed arguments for each execution
def is_extended_id(self, can_id):
return (can_id & 0x80000000) != 0
def mask_id(self, can_id):
return (0x1FFFFFFF if self.is_extended_id(can_id) else 0x7FF) & can_id
def pop_string(self):
for i in range(len(self.data)):
if self.data[i] == 0x00:
break
else:
raise RuntimeError("Could not find null terminator")
string_data = self.data[0:i]
self.data = self.data[i + 1 :]
return string_data.decode("utf-8")
def pop_arg(self, struct_type):
dummy = struct.pack(struct_type, 0)
arg_data = self.data[0 : len(dummy)]
self.data = self.data[len(dummy) :]
return struct.unpack(struct_type, arg_data)[0]
def push_string(self, val):
val += "\0"
self.data += val.encode("utf-8")
def push_arg(self, struct_type, val):
self.data += struct.pack(struct_type, val)
def handle_message(self, receive_msg):
for config in self.command_configs:
if receive_msg.arbitration_id == self.mask_id(
config["canRequestId"]
) and receive_msg.is_extended_id == self.is_extended_id(config["canRequestId"]):
break
else:
return
try:
self.data = bytearray(receive_msg.data)
command_id = self.pop_string()
issued_timestamp_ms = self.pop_arg(">Q")
execution_timeout_ms = self.pop_arg(">Q")
arg_value = self.pop_arg(config["argumentType"])
print(
f"Received request for {config['actuatorName']}"
f" with command id {command_id}, value {arg_value},"
f" issued timestamp {issued_timestamp_ms}, execution timeout {execution_timeout_ms}"
)
self.args[command_id] = arg_value
execution_state = {"in_progress_counter": config["inProgressCount"]}
def send_response():
status = 10 if execution_state["in_progress_counter"] > 0 else config["status"]
print(
f"Sending response to request for {config['actuatorName']}"
f" with command id {command_id}, status {status},"
f" reason code {config['reasonCode']},"
f" reason description {config['reasonDescription']}"
)
self.data = bytearray()
send_msg = can.Message()
send_msg.arbitration_id = self.mask_id(config["canResponseId"])
send_msg.is_extended_id = self.is_extended_id(config["canResponseId"])
send_msg.is_fd = True
self.push_string(command_id)
self.push_arg("B", status)
self.push_arg(">I", config["reasonCode"])
self.push_string(config["reasonDescription"])
send_msg.data = self.data
send_msg.dlc = len(self.data)
self.can_bus.send(send_msg)
if execution_state["in_progress_counter"] > 0:
execution_state["in_progress_counter"] -= 1
self.loop.call_later(config["responseDelay"], send_response)
self.loop.call_later(config["responseDelay"], send_response)
except Exception as e:
print(e)
def __init__(self, interface):
self.can_bus = can.Bus(interface, interface="socketcan", fd=True)
self.loop = asyncio.new_event_loop()
can.Notifier(bus=self.can_bus, listeners=[self.handle_message], loop=self.loop)
self.thread = Thread(name="CanCommandServer", target=self.loop.run_forever)
self.thread.start()
def stop(self):
async def stop_loop():
self.loop.stop()
asyncio.run_coroutine_threadsafe(stop_loop(), self.loop)
self.thread.join()
self.can_bus.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Runs a server to respond to CAN command messages")
parser.add_argument("-i", "--interface", required=True, help="CAN interface, e.g. vcan0")
args = parser.parse_args()
command_server = CanCommandServer(args.interface)
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
pass
command_server.stop()