This repository has been archived by the owner on Apr 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrtd.py
83 lines (69 loc) · 2.5 KB
/
rtd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# partial modification on /rtde/examples/record.py
import argparse
import logging
import sys
sys.path.append('..')
import rtde.rtde as rtde
import rtde.rtde_config as rtde_config
import rtde.serialize as serialize
from time import sleep
def run():
# parameters
parser = argparse.ArgumentParser()
parser.add_argument('--host', default='10.20.0.25',
help='name of host to connect to (localhost)')
parser.add_argument('--port', type=int, default=30004,
help='port number (30004)')
parser.add_argument('--frequency', type=int, default=10,
help='the sampling frequency in Herz')
parser.add_argument('--config', default='record_configuration.xml',
help='data configuration file to use (record_configuration.xml)')
args = parser.parse_args()
print args.host
conf = rtde_config.ConfigFile(args.config)
output_names, output_types = conf.get_recipe('out')
con = rtde.RTDE(args.host, 30004)
con.connect()
# get controller version
con.get_controller_version()
# setup recipes
if not con.send_output_setup(output_names, output_types, frequency=args.frequency):
logging.error('Unable to configure output')
sys.exit()
# start data synchronization
if not con.send_start():
logging.error('Unable to start synchronization')
sys.exit()
# get the data
keep_running = True
while keep_running:
try:
state = con.receive()
if state is not None:
data = []
for i in range(len(output_names)):
size = serialize.get_item_size(output_types[i])
value = state.__dict__[output_names[i]]
if size > 1:
data.extend(value)
else:
data.append(value)
# first 3 need to be multiplied by 1000
for i in range(0, 3):
data[i] *= 1000
print(", ".join(str(v) for v in data))
else:
# lost connection retrying
print "Lost connection retrying..."
run()
except KeyboardInterrupt:
keep_running = False
con.send_pause()
con.disconnect()
sys.exit()
sleep(1/args.frequency)
con.send_pause()
con.disconnect()
# keep retrying to get the initial synchronization (connection)
while True:
run()