From 496e127401f9944283fe31dec24bc71db9e68893 Mon Sep 17 00:00:00 2001 From: Abhay Deshpande Date: Thu, 9 May 2024 14:26:11 -0700 Subject: [PATCH 1/8] Add hooks to state --- tycho_demo/demo_interface.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/tycho_demo/demo_interface.py b/tycho_demo/demo_interface.py index e19d679..dda2f61 100644 --- a/tycho_demo/demo_interface.py +++ b/tycho_demo/demo_interface.py @@ -1,6 +1,7 @@ from __future__ import print_function import sys +from typing import Callable sys.path.append("/usr/lib/python3/dist-packages") # Enable the conda python interpreter to access ROS packages # Even if ROS installs the packages to the system python @@ -55,6 +56,15 @@ def __init__(self, arm, gains_file): self.controller_save_file = None self.res_estimator = None + # modes and hooks + self.modes: dict[str, Callable[[State, float], tuple[np.ndarray, np.ndarray]]] = {} + self.handlers: dict[str, Callable[[str, State], None]] = {} + self.onclose: list[Callable[[State], None]] = [] + # invoked with (state, prev_mode) + self.mode_change_hooks: list[Callable[[State, str], None]] = [] + self.pre_command_hooks: dict[str, list[Callable[[State], None]]] = {} + self.post_command_hooks: dict[str, list[Callable[[State], None]]] = {} + # For feedback self.current_position = np.empty(arm.dof_count, dtype=np.float64) self.current_velocity = np.empty(arm.dof_count, dtype=np.float64) @@ -206,7 +216,7 @@ def send_command_controller(state, timestamp=None): np.array(state.command_effort), np.array(pwm))) -def command_proc(state): +def command_proc(state: State): group = state.arm.group group.feedback_frequency = float(ROBOT_FEEDBACK_FREQUENCY) # Obtain update from the robot at this frequency state.command_smoother = Smoother(7, SMOOTHER_WINDOW_SIZE) # currently unused @@ -255,7 +265,11 @@ def command_proc(state): # Generating command assert current_mode in state.mode_keys + for fn in state.pre_command_hooks.get("*", []) + state.pre_command_hooks.get(current_mode, []): + fn(state) command_pos, command_vel = state.modes[current_mode](state, time()) + for fn in state.post_command_hooks.get("*", []) + state.post_command_hooks.get(current_mode, []): + fn(state) state.lock() @@ -393,12 +407,8 @@ def run_demo(callback_func=None, params=None, cmd_freq=0): _load_hebi_controller_gains('L', state) # Basic demo functions - modes = {} - onclose = [] - modes['idle'] = __idle + state.modes['idle'] = __idle state.handlers = init_default_handlers() - state.modes = modes - state.onclose = onclose state.params = params # Set command frequency @@ -426,10 +436,14 @@ def run_demo(callback_func=None, params=None, cmd_freq=0): while res != 'q' and not state.quit: print_and_cr('') if res in state.handlers_keys: + prev_mode = state.mode try: state.handlers[res](res, state) except Exception as e: print_and_cr(colors.bg.red + str(e) + colors.reset) + if state.mode != prev_mode: + for fn in state.mode_change_hooks: + fn(state, prev_mode) sleep(0.01) res = getch() From 5af8f4950763abf7d46482e70a10222ba57c69de Mon Sep 17 00:00:00 2001 From: Abhay Deshpande Date: Thu, 16 May 2024 17:22:05 -0700 Subject: [PATCH 2/8] Record data from ros topics --- tycho_demo/addon/__init__.py | 1 + tycho_demo/addon/ros_record.py | 34 ++++++++++++++++++++++++++++++++++ tycho_demo/demo_interface.py | 25 ++++++++++++++----------- 3 files changed, 49 insertions(+), 11 deletions(-) create mode 100644 tycho_demo/addon/ros_record.py diff --git a/tycho_demo/addon/__init__.py b/tycho_demo/addon/__init__.py index 2246730..a69aafc 100644 --- a/tycho_demo/addon/__init__.py +++ b/tycho_demo/addon/__init__.py @@ -3,6 +3,7 @@ from .recording import add_recording_function from .replay_joints import add_replay_function from .replay_pose import add_replay_pose_function +from .ros_record import add_ros_record_function from .tuning import add_tuning_function from .visualize import add_visualize_function from .grabbing import add_grabbing_function diff --git a/tycho_demo/addon/ros_record.py b/tycho_demo/addon/ros_record.py new file mode 100644 index 0000000..769c500 --- /dev/null +++ b/tycho_demo/addon/ros_record.py @@ -0,0 +1,34 @@ +from typing import List +from demo_interface import State +from threading import Lock + +from tycho_env.utils import GenericMessageSubscriber, print_and_cr, numpify + + +def record_topic(state: State, topic: str): + def callback(msg): + try: + value = numpify(msg) + with state.rostopic_mutex: + state.latest_ros_data[topic] = value + except Exception as e: + print_and_cr(f"ERROR: {str(e)}") + sub = GenericMessageSubscriber(topic, callback, queue_size=10) + state.topic_subs[topic] = sub + + +def add_ros_record_function(state: State, recorded_topics: List[str]): + state.topic_subs = {} + state.rostopic_mutex = Lock() + state.latest_ros_data = {} + + if len(recorded_topics) > 0: + state.pre_command_hooks["*"].append(pre_cmd_hook) + + for topic in recorded_topics: + record_topic(state, topic) + + +def pre_cmd_hook(state: State): + with state.rostopic_mutex: + state.info.update(state.latest_ros_data) diff --git a/tycho_demo/demo_interface.py b/tycho_demo/demo_interface.py index dda2f61..200ca8b 100644 --- a/tycho_demo/demo_interface.py +++ b/tycho_demo/demo_interface.py @@ -1,13 +1,14 @@ from __future__ import print_function import sys -from typing import Callable +from typing import Callable, List, Dict sys.path.append("/usr/lib/python3/dist-packages") # Enable the conda python interpreter to access ROS packages # Even if ROS installs the packages to the system python from time import time, sleep from threading import Lock, Thread +from collections import defaultdict import numpy as np # Ros @@ -27,7 +28,7 @@ # Local from tycho_demo.keyboard import getch -from tycho_demo.addon import add_snapping_function +from tycho_demo.addon import add_snapping_function, add_ros_record_function # Feedback frequency (Hz) ROBOT_FEEDBACK_FREQUENCY = 100 # How often to pull sensor info @@ -57,13 +58,14 @@ def __init__(self, arm, gains_file): self.res_estimator = None # modes and hooks - self.modes: dict[str, Callable[[State, float], tuple[np.ndarray, np.ndarray]]] = {} - self.handlers: dict[str, Callable[[str, State], None]] = {} - self.onclose: list[Callable[[State], None]] = [] + self.modes: Dict[str, Callable[[State, float], tuple[np.ndarray, np.ndarray]]] = {} + self.handlers: Dict[str, Callable[[str, State], None]] = {} + self.onclose: List[Callable[[State], None]] = [] # invoked with (state, prev_mode) - self.mode_change_hooks: list[Callable[[State, str], None]] = [] - self.pre_command_hooks: dict[str, list[Callable[[State], None]]] = {} - self.post_command_hooks: dict[str, list[Callable[[State], None]]] = {} + self.mode_change_hooks: List[Callable[[State, str], None]] = [] + self.pre_command_hooks: Dict[str, List[Callable[[State], None]]] = defaultdict(list) + self.post_command_hooks: Dict[str, List[Callable[[State], None]]] = defaultdict(list) + self.info = {} # everything here must be pickleable # For feedback self.current_position = np.empty(arm.dof_count, dtype=np.float64) @@ -265,10 +267,10 @@ def command_proc(state: State): # Generating command assert current_mode in state.mode_keys - for fn in state.pre_command_hooks.get("*", []) + state.pre_command_hooks.get(current_mode, []): + for fn in state.pre_command_hooks["*"] + state.pre_command_hooks[current_mode]: fn(state) command_pos, command_vel = state.modes[current_mode](state, time()) - for fn in state.post_command_hooks.get("*", []) + state.post_command_hooks.get(current_mode, []): + for fn in state.post_command_hooks["*"] + state.post_command_hooks[current_mode]: fn(state) state.lock() @@ -401,7 +403,7 @@ def __idle(state, curr_time): # Main thread switches running mode by accepting keyboard command ####################################################################### -def run_demo(callback_func=None, params=None, cmd_freq=0): +def run_demo(callback_func=None, params=None, recorded_topics=[], cmd_freq=0): params = params or {} state, _, _ = init_robotarm() _load_hebi_controller_gains('L', state) @@ -416,6 +418,7 @@ def run_demo(callback_func=None, params=None, cmd_freq=0): state.counter_skip_freq = round(ROBOT_FEEDBACK_FREQUENCY / cmd_freq) add_snapping_function(state) + add_ros_record_function(state, recorded_topics) # Caller install custom handlers if callback_func is not None: From 26888119fea02849496228ca9f64f79264e5bf16 Mon Sep 17 00:00:00 2001 From: Abhay Deshpande Date: Tue, 21 May 2024 16:35:10 -0700 Subject: [PATCH 3/8] Add logging and rewrite replay --- tycho_demo/addon/__init__.py | 5 +- tycho_demo/addon/logger.py | 110 ++++++++++++++++ tycho_demo/addon/recording.py | 206 ------------------------------ tycho_demo/addon/replay.py | 55 ++++++++ tycho_demo/addon/replay_joints.py | 64 ---------- tycho_demo/addon/replay_pose.py | 76 ----------- tycho_demo/addon/ros_record.py | 7 +- tycho_demo/demo_interface.py | 16 ++- tycho_demo/launch_demo.py | 3 +- tycho_demo/parse_log.py | 61 +++++++++ tycho_demo/utils/__init__.py | 3 +- tycho_demo/utils/util.py | 11 ++ 12 files changed, 254 insertions(+), 363 deletions(-) create mode 100644 tycho_demo/addon/logger.py delete mode 100644 tycho_demo/addon/recording.py create mode 100644 tycho_demo/addon/replay.py delete mode 100644 tycho_demo/addon/replay_joints.py delete mode 100644 tycho_demo/addon/replay_pose.py create mode 100644 tycho_demo/parse_log.py create mode 100644 tycho_demo/utils/util.py diff --git a/tycho_demo/addon/__init__.py b/tycho_demo/addon/__init__.py index a69aafc..fdbcc0a 100644 --- a/tycho_demo/addon/__init__.py +++ b/tycho_demo/addon/__init__.py @@ -1,8 +1,7 @@ from .snapping import add_snapping_function from .moving import add_moving_function -from .recording import add_recording_function -from .replay_joints import add_replay_function -from .replay_pose import add_replay_pose_function +from .logger import add_logger_function +from .replay import add_replay_function from .ros_record import add_ros_record_function from .tuning import add_tuning_function from .visualize import add_visualize_function diff --git a/tycho_demo/addon/logger.py b/tycho_demo/addon/logger.py new file mode 100644 index 0000000..13550ed --- /dev/null +++ b/tycho_demo/addon/logger.py @@ -0,0 +1,110 @@ +from io import TextIOWrapper +from typing import Optional +import os + +from queue import Queue +from threading import Lock, Thread +from copy import deepcopy +import time +from pickle import Pickler + +from tycho_env.utils import print_and_cr, colors + +# Singletons +STATE_QUEUE = Queue() +IS_RECORDING = False # Similar to state.curr_recording but flag for callback +WRITER_LOCK = Lock() +CURR_WRITER: Optional[Pickler] = None +CURR_FILE: Optional[TextIOWrapper] = None + +def add_logger_function(state): + state.handlers['r'] = _record + state.handlers['R'] = _count_recording + state.handlers['D'] = _delete_recording + state.onclose.append(stop_recording) + state.post_command_hooks["*"].append(post_cmd_callback) + + state.last_recording = None + state.curr_recording = None + + if "save_record_folder" in state.params: + state.save_record_folder = state.params["save_record_folder"] + else: + dir_path = os.path.dirname(os.path.realpath(__file__)) + state.save_record_folder = os.path.join(dir_path, "..", "..", "recording") + if not os.path.isdir(state.save_record_folder): + print_and_cr(f"Creating folder for recordings: {state.save_record_folder}") + os.mkdir(state.save_record_folder) + + Thread(target=recording_worker, daemon=True).start() + +def recording_worker(): + while True: + info = STATE_QUEUE.get() + try: + with WRITER_LOCK: + CURR_WRITER.dump(info) + finally: + STATE_QUEUE.task_done() + +def post_cmd_callback(state): + if IS_RECORDING: + info = deepcopy(state.info) + STATE_QUEUE.put(info) + +def _record(_, state): + toggle_recording(state) + +def _count_recording(_, state): + n_recordings = len([f for f in os.listdir(state.save_record_folder) if f.endswith(".pkl")]) + print_and_cr(f"Number of recordings: {n_recordings}") + +def _delete_recording(_, state): + delete_last_recording(state) + +def delete_last_recording(state): + if state.curr_recording: + stop_recording(state) + if state.last_recording: + print_and_cr(colors.bg.red + 'Deleting last recording' + colors.reset) + os.remove(state.last_recording) + state.last_recording = None + +def set_recording(state, enabled: bool): + if enabled != bool(state.curr_recording): + toggle_recording(state) + +def start_recording(state): + if state.curr_recording: + stop_recording(state) + record_path = os.path.join(state.save_record_folder, + time.strftime('%y-%m-%d-%H-%M-%S.pkl', time.localtime())) + print_and_cr(colors.bg.green + f"Recording to: {record_path}") + + global CURR_WRITER, IS_RECORDING, CURR_FILE + assert STATE_QUEUE.empty() + with WRITER_LOCK: + CURR_FILE = open(record_path, "wb") + CURR_WRITER = Pickler(CURR_FILE) + IS_RECORDING = True + state.curr_recording = record_path + state.last_recording = record_path + +def stop_recording(state): + if state.curr_recording: + print_and_cr(colors.bg.lightgrey + "Stop recording" + colors.reset) + + global IS_RECORDING, CURR_WRITER, CURR_FILE + IS_RECORDING = False + STATE_QUEUE.join() + with WRITER_LOCK: + CURR_FILE.close() + CURR_FILE = None + CURR_WRITER = None + state.curr_recording = None + +def toggle_recording(state): + if state.curr_recording: + stop_recording(state) + else: + start_recording(state) diff --git a/tycho_demo/addon/recording.py b/tycho_demo/addon/recording.py deleted file mode 100644 index 5eb04aa..0000000 --- a/tycho_demo/addon/recording.py +++ /dev/null @@ -1,206 +0,0 @@ -####################################################################### -# Recording -# ------------------------------------------------------------------------------ -# Record selected topics to a rosbag -# Which topic to record? is stored in state.ros_record_topics -# - 'r' to record -# - 'D' to delete the last record (can only delete one last record) -# - 'R' to count the number of recordings in the folder -# -# To change the topics to record, edit state.ros_record_topics before you press the record button -# It's a list of topic names. -####################################################################### - -import os - -from tycho_env.utils import print_and_cr, colors -from time import strftime, localtime, sleep -from threading import Lock, Thread -from queue import Queue -from functools import partial -from typing import Dict - -import rospy -import rosbag -import rostopic - - -# Singleton -RECORD_LOCK = Lock() -MSG_QUEUE = Queue() -BAG_WRITERS: Dict[str, rosbag.Bag] = {} -ROSBAG_RECORDING = False -RECORDED_TOPICS = set() -WORKER_LAUNCHED = False - -def subscriber_callback(bagname, topic, msg): - if ROSBAG_RECORDING: - MSG_QUEUE.put((bagname, topic, msg)) - -def recording_worker(): - while True: - bagname, topic, msg = MSG_QUEUE.get() - try: - with RECORD_LOCK: - try: - writer: rosbag.Bag = BAG_WRITERS[bagname] - writer.write(topic, msg) - except KeyError: - print_and_cr(f"ERR: No bag name: {bagname}") - finally: - MSG_QUEUE.task_done() - -def launch_recording_subs(pose_topics, dict_topics): - global RECORDED_TOPICS, WORKER_LAUNCHED - unknown_topics = [] - for topic in pose_topics: - if topic not in RECORDED_TOPICS: - TopicType, _, _ = rostopic.get_topic_class(topic) - if TopicType is not None: - callback = partial(subscriber_callback, "pose", topic) - rospy.Subscriber(topic, TopicType, callback, queue_size=10) - RECORDED_TOPICS.add(topic) - else: - unknown_topics.append(topic) - for bagname, topic in dict_topics.items(): - if topic not in RECORDED_TOPICS: - TopicType, _, _ = rostopic.get_topic_class(topic) - if TopicType is not None: - callback = partial(subscriber_callback, bagname, topic) - rospy.Subscriber(topic, TopicType, callback, queue_size=10) - RECORDED_TOPICS.add(topic) - else: - unknown_topics.append(topic) - if not WORKER_LAUNCHED: - Thread(target=recording_worker, daemon=True).start() - WORKER_LAUNCHED = True - print_and_cr(f"Following topics will not be recorded: {', '.join(unknown_topics)}") - -def add_recording_function(state): - state.handlers['r'] = _record - state.handlers['D'] = _delete_recording - state.handlers['R'] = _count_recording - state.handlers['f'] = _relabel_failure_recording - state.rosbag_recording_to = False - state.ros_record_topics = [ - '/joint_states', '/joint_commands', - '/MocapPointArray', - '/Choppose', '/Choppose_target', - '/Ball/point', '/target/pose', - '/R0/point', '/R1/point', - '/R2/point', - ] - state.ros_record_dicts = { - 'camera_kinect': '/azcam_front/rgb/image_raw/compressed' - } - state.onclose.append(_stop_recording_on_quit) - - if 'save_record_folder' in state.params: - state.save_record_folder = state.params['save_record_folder'] - else: - dir_path = os.path.dirname(os.path.realpath(__file__)) - state.save_record_folder = os.path.join(dir_path, '..', 'recording') - if not os.path.isdir(state.save_record_folder): - print(f"Attempt to create the recording folder: {state.save_record_folder}") - os.mkdir(state.save_record_folder) - -def _record(key, state): - toggle_rosbag_recording(state) - -def _delete_recording(key, state): - if state.rosbag_recording_to: # Stop recording - stop_rosbag_recording(state.ros_record_dicts) - state.rosbag_recording_to = False - if state.last_rosbag is not None: - delete_recording(state.last_rosbag, state.ros_record_dicts) - state.last_rosbag = None - -def _count_recording(key, state): - number_of_recordings = len( [f for f in os.listdir(state.save_record_folder) - if f.endswith('-pose.bag')]) - print("Number of recordings:", number_of_recordings) - -def _stop_recording_on_quit(state): - if state.rosbag_recording_to: - stop_rosbag_recording(state.ros_record_dicts) - -def _relabel_failure_recording(key, state): - if state.rosbag_recording_to: # Stop recording - stop_rosbag_recording(state.ros_record_dicts) - state.rosbag_recording_to = False - if state.last_rosbag is not None: - label_failure_demo(state.last_rosbag, state.ros_record_dicts) - -# ------------------------------------------------------------------------------ - -def start_rosbag_recording(record_prefix, pose_topics, dict_topics): - print_and_cr(colors.bg.green + 'Recording to rosbag {}'.format( - os.path.basename(record_prefix))) - - launch_recording_subs(pose_topics, dict_topics) - - def create_writer(bagname): - return rosbag.Bag(f"{record_prefix}-{bagname}.bag", "w") - - with RECORD_LOCK: - if len(BAG_WRITERS) != 0 or not MSG_QUEUE.empty(): - print_and_cr("ERR: Recording already in progress!") - return - - BAG_WRITERS["pose"] = create_writer("pose") - for bagname in dict_topics: - BAG_WRITERS[bagname] = create_writer(bagname) - global ROSBAG_RECORDING - ROSBAG_RECORDING = True - -def stop_rosbag_recording(dict_topics): - print_and_cr(colors.bg.lightgrey + f'Stop rosbag recording' + colors.reset) - - global ROSBAG_RECORDING - ROSBAG_RECORDING = False - MSG_QUEUE.join() - with RECORD_LOCK: - for _, writer in BAG_WRITERS.items(): - writer.close() - BAG_WRITERS.clear() - -def delete_recording(rosbag_recording_to, dict_topics): - print_and_cr(colors.bg.red + 'Deleting rosbag recording' + colors.reset) - list_fn = [rosbag_recording_to+'-pose.bag'] + [ - f'{rosbag_recording_to}-{_topic}.bag' for _topic in dict_topics.keys()] - for fn in list_fn: - while not os.path.isfile(fn): - sleep(0.05) - os.remove(fn) - -def label_failure_demo(rosbag_recording_to, dict_topics): - print_and_cr(colors.bg.red + 'Re-label rosbag recording' + colors.reset) - list_fn = [rosbag_recording_to+'-pose.bag'] + [ - f'{rosbag_recording_to}-{_topic}.bag' for _topic in dict_topics.keys()] - for fn in list_fn: - while not os.path.isfile(fn): - sleep(0.05) - split_fn = fn.split('/') - new_fn = split_fn[-1].split('-') - new_fn[-1] = 'fail-'+new_fn[-1] - split_fn[-1] = '-'.join(new_fn) - cmd = "mv {} {}".format(fn, '/'.join(split_fn)) - os.system(cmd) - -def set_rosbag_recording(state, enabled: bool): - if enabled != bool(state.rosbag_recording_to): - toggle_rosbag_recording(state) - -def toggle_rosbag_recording(state): - if state.rosbag_recording_to: # Stop recording if it has been running - stop_rosbag_recording(state.ros_record_dicts) - state.rosbag_recording_to = False - else: # Start recording - rosbag_recording_to = os.path.join( - state.save_record_folder, - strftime('%y-%m-%d-%H-%M-%S', localtime())) - state.rosbag_recording_to = True - state.last_rosbag = rosbag_recording_to - start_rosbag_recording(rosbag_recording_to, - state.ros_record_topics, - state.ros_record_dicts) diff --git a/tycho_demo/addon/replay.py b/tycho_demo/addon/replay.py new file mode 100644 index 0000000..865e7ff --- /dev/null +++ b/tycho_demo/addon/replay.py @@ -0,0 +1,55 @@ +from tycho_env.utils import print_and_cr, construct_command +from tycho_demo.utils import read_raw_log + +def add_replay_function(state): + state.handlers['p'] = _replay_pose + state.handlers['P'] = _replay_joints + state.modes["replay"] = __replay + state.modes["wait_for_replay"] = __wait_for_replay + state.replay_recording = None + +def _replay_pose(_, state): + state.replay_type = "pose" + replay(state) + +def _replay_joints(_, state): + state.replay_type = "joints" + replay(state) + +def replay(state): + state.wait_for_replay_pos = list(state.current_position) + with state._mutex: + state.mode = "wait_for_replay" + state.replay_recording = None + while state.replay_recording is None: + recording_path = input("Recording to replay: ") + try: + recording = list(read_raw_log(recording_path)) + with state._mutex: + state.replay_idx = 0 + state.replay_recording = recording + except: + print_and_cr("Couldn't open given recording path!") + state.mode = "replay" + print_and_cr("Starting replay!") + +def __wait_for_replay(state, _): + return state.wait_for_replay_pos, [None] * 7 + +def __replay(state, _): + with state._mutex: + if state.replay_recording is not None: + s = state.replay_recording[state.replay_idx] + if state.replay_type == "pose": + target_choppose = s["target_choppose"] + target_pos = construct_command(state.arm, state.current_position, target_vector=target_choppose) + elif state.replay_type == "joints": + target_pos = s["target_position"] + state.replay_idx += 1 + if state.replay_idx == len(state.replay_recording): + print_and_cr(f"Finished replay!") + state.wait_for_replay_pos = target_pos + state.mode = "wait_for_replay" + else: + target_pos = state.current_position + return target_pos, [None] * 7 diff --git a/tycho_demo/addon/replay_joints.py b/tycho_demo/addon/replay_joints.py deleted file mode 100644 index d84c7c5..0000000 --- a/tycho_demo/addon/replay_joints.py +++ /dev/null @@ -1,64 +0,0 @@ -from threading import Lock, Thread -import numpy as np -from tycho_env.utils import print_and_cr -import rospy -from sensor_msgs.msg import JointState - -from tycho_env.utils import construct_choppose - -####################################################################### -# Replay -# ------------------------------------------------------------------------------ -# Replay a recording rosbag and command the robot to repeat the movement -####################################################################### - -REPLAY_INIT_THREAD = None -TARGET_POSE_TOPIC = '/joint_commands_replay' - -def add_replay_function(state): - state.handlers['p'] = _replay - state.modes['replay'] = __replay - state.modes['wait_for_replay'] = __wait_for_replay - state._replay_lock = Lock() - launch_replay_subscriber(state) - -def _replay(key, state): - state.lock() - global REPLAY_INIT_THREAD - if (state.mode != 'replay' and - (REPLAY_INIT_THREAD is None or not REPLAY_INIT_THREAD.isAlive())): - print_and_cr("Entering replay mode ... initializing ...") - state.mode = 'wait_for_replay' - state.fix_position = list(state.current_position) - REPLAY_INIT_THREAD = Thread(target=init_replay_topic, args=(state,)) - REPLAY_INIT_THREAD.start() - state.unlock() - -def __replay(state, cur_time): - state._replay_lock.acquire() - cmd = list(state.last_replay_cmd) - state._replay_lock.release() - print_and_cr(f"Curr pose = {construct_choppose(state.arm, state.current_position)}") - print_and_cr(f"Target = {construct_choppose(state.arm, cmd)}") - return cmd, [None] * 7 - -def __wait_for_replay(state, cur_time): - return state.fix_position, [None] * 7 - -def launch_replay_subscriber(state): - print_and_cr('Launch Ros Subscriber for replaying ... ') - def callback(data): - if state.mode == 'replay': - state._replay_lock.acquire() - state.last_replay_cmd = list(data.position) - state._replay_lock.release() - rospy.Subscriber(TARGET_POSE_TOPIC, JointState, callback, queue_size=1) - -def init_replay_topic(state): - replay_msg = rospy.wait_for_message(TARGET_POSE_TOPIC, JointState) - state.last_replay_cmd = list(replay_msg.position) - rospy.loginfo( - 'Established connection with target topic {}'.format(TARGET_POSE_TOPIC)) - if state.mode == 'wait_for_replay': - state.mode = 'replay' - print_and_cr('Entered replay mode') \ No newline at end of file diff --git a/tycho_demo/addon/replay_pose.py b/tycho_demo/addon/replay_pose.py deleted file mode 100644 index 823ab5e..0000000 --- a/tycho_demo/addon/replay_pose.py +++ /dev/null @@ -1,76 +0,0 @@ -from threading import Lock, Thread -import numpy as np -from scipy.spatial.transform import Rotation as scipyR -import rospy - -from tycho_env.utils import print_and_cr, construct_command -from tycho_demo_ros.msg import ChopPose - -####################################################################### -# Replay -# ------------------------------------------------------------------------------ -# Replay a recording rosbag and command the robot to repeat the movement -# Specifically, replay chopstick pose -####################################################################### - -REPLAY_INIT_THREAD = None -TARGET_POSE_TOPIC = '/Choppose_target2' - -def add_replay_pose_function(state): - state.handlers['p'] = _replay - state.modes['replay'] = __replay - state.modes['wait_for_replay'] = __wait_for_replay - state._replay_lock = Lock() - launch_replay_subscriber(state) - -def _replay(key, state): - state.lock() - global REPLAY_INIT_THREAD - if (state.mode != 'replay' and - (REPLAY_INIT_THREAD is None or not REPLAY_INIT_THREAD.isAlive())): - print_and_cr("Entering replay mode ... initializing ...") - print_and_cr("Please remap rostopic with: ") - print_and_cr("rosbag play 22-07-19-23-35-32-pose.bag --topics /Choppose_target /Choppose_target:=/Choppose_target2") - state.mode = 'wait_for_replay' - state.fix_position = list(state.current_position) - REPLAY_INIT_THREAD = Thread(target=init_replay_topic, args=(state,)) - REPLAY_INIT_THREAD.start() - state.unlock() - -def __replay(state, cur_time): - target_transform = np.zeros((3,4)) - state._replay_lock.acquire() - target_open = state.last_replay_open - _p = state.last_replay_pose.position - _q = state.last_replay_pose.orientation - target_transform[0:3, 3] = [_p.x, _p.y, _p.z] - target_transform[0:3,0:3] = scipyR.from_quat([_q.x, _q.y, _q.z, _q.w]).as_matrix() - state._replay_lock.release() - pos_cmd = construct_command(state.arm, state.current_position, - target_transformation=target_transform, target_open=target_open) - return pos_cmd, [None] * 7 - -def __wait_for_replay(state, cur_time): - return state.fix_position, [None] * 7 - -def launch_replay_subscriber(state): - print_and_cr('Launch Ros Subscriber for replaying ... ') - def callback(data): - if state.mode == 'replay': - state._replay_lock.acquire() - state.last_replay_pose = data.pose - state.last_replay_open = data.open - state._replay_lock.release() - rospy.Subscriber(TARGET_POSE_TOPIC, ChopPose, callback, queue_size=1) - -def init_replay_topic(state): - replay_msg = rospy.wait_for_message(TARGET_POSE_TOPIC, ChopPose) - rospy.loginfo( - 'Established connection with target topic {}'.format(TARGET_POSE_TOPIC)) - state._replay_lock.acquire() - state.last_replay_pose = replay_msg.pose - state.last_replay_open = replay_msg.open - state._replay_lock.release() - if state.mode == 'wait_for_replay': - state.mode = 'replay' - print_and_cr('Entered replay mode') diff --git a/tycho_demo/addon/ros_record.py b/tycho_demo/addon/ros_record.py index 769c500..3f5008c 100644 --- a/tycho_demo/addon/ros_record.py +++ b/tycho_demo/addon/ros_record.py @@ -1,11 +1,10 @@ from typing import List -from demo_interface import State from threading import Lock from tycho_env.utils import GenericMessageSubscriber, print_and_cr, numpify -def record_topic(state: State, topic: str): +def record_topic(state, topic: str): def callback(msg): try: value = numpify(msg) @@ -17,7 +16,7 @@ def callback(msg): state.topic_subs[topic] = sub -def add_ros_record_function(state: State, recorded_topics: List[str]): +def add_ros_record_function(state, recorded_topics: List[str]): state.topic_subs = {} state.rostopic_mutex = Lock() state.latest_ros_data = {} @@ -29,6 +28,6 @@ def add_ros_record_function(state: State, recorded_topics: List[str]): record_topic(state, topic) -def pre_cmd_hook(state: State): +def pre_cmd_hook(state): with state.rostopic_mutex: state.info.update(state.latest_ros_data) diff --git a/tycho_demo/demo_interface.py b/tycho_demo/demo_interface.py index 200ca8b..776f28b 100644 --- a/tycho_demo/demo_interface.py +++ b/tycho_demo/demo_interface.py @@ -22,13 +22,14 @@ from tycho_env.utils import ( get_gains_path, load_gain, print_and_cr, colors, + construct_choppose, euler_angles_from_rotation_matrix) # Import Constant from tycho_env.utils import OFFSET_JOINTS, SMOOTHER_WINDOW_SIZE # Local from tycho_demo.keyboard import getch -from tycho_demo.addon import add_snapping_function, add_ros_record_function +from tycho_demo.addon import add_snapping_function, add_ros_record_function, add_logger_function # Feedback frequency (Hz) ROBOT_FEEDBACK_FREQUENCY = 100 # How often to pull sensor info @@ -53,7 +54,6 @@ def __init__(self, arm, gains_file): self.arm = arm self.gains_file = gains_file self.publishers = [] - self.rosbag_recording_to = None self.controller_save_file = None self.res_estimator = None @@ -181,10 +181,6 @@ def send_command(state, timestamp): hebi_command.position = np.array(command_pos) - OFFSET_JOINTS hebi_command.velocity = np.array(command_vel) hebi_command.effort = state.arm._get_grav_comp_efforts(state.current_position).copy() - # print_and_cr(f"command_position = {hebi_command.position}") - # print_and_cr(f"command_velocity = {hebi_command.velocity }") - # print_and_cr(f"command_effort = {hebi_command.effort}") - state.arm.group.send_command(hebi_command) if state.controller_save_file: @@ -266,12 +262,17 @@ def command_proc(state: State): state.print_state = False # Generating command + t = time() + state.info["curr_time"] = t + state.info["curr_position"] = state.current_position + state.info["curr_choppose"] = construct_choppose(state.arm, state.current_position) assert current_mode in state.mode_keys for fn in state.pre_command_hooks["*"] + state.pre_command_hooks[current_mode]: fn(state) - command_pos, command_vel = state.modes[current_mode](state, time()) + command_pos, command_vel = state.modes[current_mode](state, t) for fn in state.post_command_hooks["*"] + state.post_command_hooks[current_mode]: fn(state) + state.info["target_position"] = command_pos state.lock() @@ -419,6 +420,7 @@ def run_demo(callback_func=None, params=None, recorded_topics=[], cmd_freq=0): add_snapping_function(state) add_ros_record_function(state, recorded_topics) + add_logger_function(state) # Caller install custom handlers if callback_func is not None: diff --git a/tycho_demo/launch_demo.py b/tycho_demo/launch_demo.py index 9f60d58..6768613 100644 --- a/tycho_demo/launch_demo.py +++ b/tycho_demo/launch_demo.py @@ -2,11 +2,10 @@ import rospy from tycho_demo import run_demo -from tycho_demo.addon import add_recording_function, add_tuning_function,\ +from tycho_demo.addon import add_tuning_function,\ add_visualize_function, add_replay_function def handler_installer(state): - add_recording_function(state) add_visualize_function(state) add_replay_function(state) add_tuning_function(state) diff --git a/tycho_demo/parse_log.py b/tycho_demo/parse_log.py new file mode 100644 index 0000000..96390fe --- /dev/null +++ b/tycho_demo/parse_log.py @@ -0,0 +1,61 @@ +import argparse +import pickle +from typing import Any, Dict, List, Generator +import numpy as np + +from tycho_demo.utils import read_raw_log + +def get_args(): + parser = argparse.ArgumentParser(description="Dump recorded logs into a D4RL-style dataset") + parser.add_argument("log_paths", nargs="+", help="The log files to parse") + parser.add_argument("out_path", help="Where to save the created dataset") + parser.add_argument("-o", "--obs_keys", nargs="+", help="The state keys to concatenate into an obs vector, if specified") + parser.add_argument("-a", "--act_keys", nargs="+", help="The state keys to concatenate into an action vector, if specified") + return parser.parse_args() + +def lod_to_dol(lod: List[Dict[Any, Any]]) -> Dict[Any, list]: + """Convert list of dicts to dict of lists""" + if len(lod) == 0: + return {} + combined = {k: [v] for k,v in lod[0].items()} + for d in lod[1:]: + assert len(d) == len(combined), "Some recordings have different keys!" + for k, v in d.items(): + assert k in combined.keys(), "Some recordings have different keys!" + combined[k].append(v) + return combined + +def main(): + args = get_args() + + trajs = [] + for path in args.log_paths: + lod = list(read_raw_log(path)) + traj = lod_to_dol(lod) + + # try to stack numpy arrays where possible + for k, v in traj.items(): + try: + v_stacked = np.stack(v, axis=0) + traj[k] = v_stacked + except: + pass + + if args.obs_keys: + assert "observations" not in traj, "observations already in log!" + obs = [traj[k] for k in args.obs_keys] + obs = np.concatenate(obs, axis=-1) + traj["observations"] = obs + if args.act_keys: + assert "actions" not in traj, "actions already in log!" + act = [traj[k] for k in args.act_keys] + act = np.concatenate(act, axis=-1) + traj["actions"] = act + + trajs.append(traj) + + with open(args.out_path, "wb") as f: + pickle.dump(trajs, f) + +if __name__ == "__main__": + main() diff --git a/tycho_demo/utils/__init__.py b/tycho_demo/utils/__init__.py index 3b1e5bc..ecc03f4 100644 --- a/tycho_demo/utils/__init__.py +++ b/tycho_demo/utils/__init__.py @@ -1,2 +1,3 @@ from .marker_pub import * -from .chop_pub import * \ No newline at end of file +from .chop_pub import * +from .util import * diff --git a/tycho_demo/utils/util.py b/tycho_demo/utils/util.py new file mode 100644 index 0000000..6af34e0 --- /dev/null +++ b/tycho_demo/utils/util.py @@ -0,0 +1,11 @@ +from pickle import Unpickler +from typing import Generator, Dict, Any + +def read_raw_log(path: str) -> Generator[Dict[Any, list], None, None]: + with open(path, "rb") as f: + reader = Unpickler(f) + try: + while True: + yield reader.load() + except EOFError: + pass From 1664ab3fc25b2219bcb78543705615140f31bfd3 Mon Sep 17 00:00:00 2001 From: Abhay Deshpande Date: Tue, 21 May 2024 17:22:17 -0700 Subject: [PATCH 4/8] Fix locking bug and catch mode changes from proc thread --- tycho_demo/demo_interface.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tycho_demo/demo_interface.py b/tycho_demo/demo_interface.py index 776f28b..18787d5 100644 --- a/tycho_demo/demo_interface.py +++ b/tycho_demo/demo_interface.py @@ -274,6 +274,12 @@ def command_proc(state: State): fn(state) state.info["target_position"] = command_pos + with state._mutex: + new_mode = state.mode + if new_mode != current_mode: + for fn in state.mode_change_hooks: + fn(state, current_mode) + state.lock() # Check for IK jump, apply smoother, and send out command @@ -441,12 +447,15 @@ def run_demo(callback_func=None, params=None, recorded_topics=[], cmd_freq=0): while res != 'q' and not state.quit: print_and_cr('') if res in state.handlers_keys: - prev_mode = state.mode + with state._mutex: + prev_mode = state.mode try: state.handlers[res](res, state) except Exception as e: print_and_cr(colors.bg.red + str(e) + colors.reset) - if state.mode != prev_mode: + with state._mutex: + new_mode = state.mode + if new_mode != prev_mode: for fn in state.mode_change_hooks: fn(state, prev_mode) sleep(0.01) From e2fd44de6096a62a8761e80f77ca01f45b954532 Mon Sep 17 00:00:00 2001 From: Abhay Deshpande Date: Wed, 5 Jun 2024 20:19:43 -0700 Subject: [PATCH 5/8] Remove mode change callback, add documentation --- README.md | 18 +++++++++++++++++- tycho_demo/demo_interface.py | 16 +--------------- tycho_demo/utils/util.py | 13 +++++++++++++ 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 8771d64..c21ce85 100644 --- a/README.md +++ b/README.md @@ -19,4 +19,20 @@ You can try grabbing (closing the end effector) by pressing 'g'. You can try out the basic step function 's' or swing function 'x'. -You can import the `run_demo` function for your downstream demo needs. \ No newline at end of file +You can import the `run_demo` function for your downstream demo needs. + +## About `tycho_demo` + +### Mode and commands + +Depending on the value of `state.mode`, the demo interface will invoke a callback (specified by `state.modes`) to get the current command, consisting of a position and velocity setpoint. +To omit one of the setpoints (i.e. set position but not velocity setpoints) return a list of `None`s. + +### Pre and Post command hooks + +You can specify callbacks to be invoked before and after the mode is invoked to get the command, in `state.pre_command_hooks` and `state.post_command_hooks`. +- `pre_command_hooks` are invoked before calling the mode. These callbacks can be used to poll sensors and populate the `state.info` dict with relevant state information. +- `post_command_hooks` are invoked after calling the mode. These callbacks can be used to save the results of actions, i.e. for logging purposes. + +Both `state.pre_command_hooks` and `state.post_command_hooks` are of type `Dict[str, List[Callable[[State], None]]]`, mapping the mode name to a list of callbacks. +This means that callbacks are invoked for the specified mode. To set a callback to be invoked for any mode, set the key as `"*"`, which is the wildcard. diff --git a/tycho_demo/demo_interface.py b/tycho_demo/demo_interface.py index 18787d5..8e929fb 100644 --- a/tycho_demo/demo_interface.py +++ b/tycho_demo/demo_interface.py @@ -61,8 +61,7 @@ def __init__(self, arm, gains_file): self.modes: Dict[str, Callable[[State, float], tuple[np.ndarray, np.ndarray]]] = {} self.handlers: Dict[str, Callable[[str, State], None]] = {} self.onclose: List[Callable[[State], None]] = [] - # invoked with (state, prev_mode) - self.mode_change_hooks: List[Callable[[State, str], None]] = [] + # The following hooks are called from the command thread before and after querying the mode callback self.pre_command_hooks: Dict[str, List[Callable[[State], None]]] = defaultdict(list) self.post_command_hooks: Dict[str, List[Callable[[State], None]]] = defaultdict(list) self.info = {} # everything here must be pickleable @@ -274,12 +273,6 @@ def command_proc(state: State): fn(state) state.info["target_position"] = command_pos - with state._mutex: - new_mode = state.mode - if new_mode != current_mode: - for fn in state.mode_change_hooks: - fn(state, current_mode) - state.lock() # Check for IK jump, apply smoother, and send out command @@ -447,17 +440,10 @@ def run_demo(callback_func=None, params=None, recorded_topics=[], cmd_freq=0): while res != 'q' and not state.quit: print_and_cr('') if res in state.handlers_keys: - with state._mutex: - prev_mode = state.mode try: state.handlers[res](res, state) except Exception as e: print_and_cr(colors.bg.red + str(e) + colors.reset) - with state._mutex: - new_mode = state.mode - if new_mode != prev_mode: - for fn in state.mode_change_hooks: - fn(state, prev_mode) sleep(0.01) res = getch() diff --git a/tycho_demo/utils/util.py b/tycho_demo/utils/util.py index 6af34e0..3a7588a 100644 --- a/tycho_demo/utils/util.py +++ b/tycho_demo/utils/util.py @@ -2,6 +2,19 @@ from typing import Generator, Dict, Any def read_raw_log(path: str) -> Generator[Dict[Any, list], None, None]: + """ + Reads a raw log file and yields each entry. + + Args: + path (str): The path to the raw log file. + + Yields: + dict: An individual log entry, the value of `state.info`. + + Raises: + FileNotFoundError: If the specified file path does not exist. + + """ with open(path, "rb") as f: reader = Unpickler(f) try: From a76094a5c5d79fa7b1fbb763bbef3322bac3642d Mon Sep 17 00:00:00 2001 From: Abhay Deshpande Date: Wed, 5 Jun 2024 20:24:34 -0700 Subject: [PATCH 6/8] Rename ros record to ros subscribe, minor cleanup --- tycho_demo/addon/__init__.py | 2 +- tycho_demo/addon/{ros_record.py => ros_subscribe.py} | 2 +- tycho_demo/demo_interface.py | 8 ++++---- tycho_demo/parse_log.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) rename tycho_demo/addon/{ros_record.py => ros_subscribe.py} (92%) diff --git a/tycho_demo/addon/__init__.py b/tycho_demo/addon/__init__.py index fdbcc0a..6f3a590 100644 --- a/tycho_demo/addon/__init__.py +++ b/tycho_demo/addon/__init__.py @@ -2,7 +2,7 @@ from .moving import add_moving_function from .logger import add_logger_function from .replay import add_replay_function -from .ros_record import add_ros_record_function +from .ros_subscribe import add_ros_subscribe_function from .tuning import add_tuning_function from .visualize import add_visualize_function from .grabbing import add_grabbing_function diff --git a/tycho_demo/addon/ros_record.py b/tycho_demo/addon/ros_subscribe.py similarity index 92% rename from tycho_demo/addon/ros_record.py rename to tycho_demo/addon/ros_subscribe.py index 3f5008c..96e9f57 100644 --- a/tycho_demo/addon/ros_record.py +++ b/tycho_demo/addon/ros_subscribe.py @@ -16,7 +16,7 @@ def callback(msg): state.topic_subs[topic] = sub -def add_ros_record_function(state, recorded_topics: List[str]): +def add_ros_subscribe_function(state, recorded_topics: List[str]): state.topic_subs = {} state.rostopic_mutex = Lock() state.latest_ros_data = {} diff --git a/tycho_demo/demo_interface.py b/tycho_demo/demo_interface.py index 8e929fb..aacd0f4 100644 --- a/tycho_demo/demo_interface.py +++ b/tycho_demo/demo_interface.py @@ -29,7 +29,7 @@ # Local from tycho_demo.keyboard import getch -from tycho_demo.addon import add_snapping_function, add_ros_record_function, add_logger_function +from tycho_demo.addon import add_snapping_function, add_ros_subscribe_function, add_logger_function # Feedback frequency (Hz) ROBOT_FEEDBACK_FREQUENCY = 100 # How often to pull sensor info @@ -263,8 +263,8 @@ def command_proc(state: State): # Generating command t = time() state.info["curr_time"] = t - state.info["curr_position"] = state.current_position - state.info["curr_choppose"] = construct_choppose(state.arm, state.current_position) + state.info["joint_pos"] = state.current_position + state.info["robot_pose"] = construct_choppose(state.arm, state.current_position) assert current_mode in state.mode_keys for fn in state.pre_command_hooks["*"] + state.pre_command_hooks[current_mode]: fn(state) @@ -418,7 +418,7 @@ def run_demo(callback_func=None, params=None, recorded_topics=[], cmd_freq=0): state.counter_skip_freq = round(ROBOT_FEEDBACK_FREQUENCY / cmd_freq) add_snapping_function(state) - add_ros_record_function(state, recorded_topics) + add_ros_subscribe_function(state, recorded_topics) add_logger_function(state) # Caller install custom handlers diff --git a/tycho_demo/parse_log.py b/tycho_demo/parse_log.py index 96390fe..1c11284 100644 --- a/tycho_demo/parse_log.py +++ b/tycho_demo/parse_log.py @@ -1,6 +1,6 @@ import argparse import pickle -from typing import Any, Dict, List, Generator +from typing import Any, Dict, List import numpy as np from tycho_demo.utils import read_raw_log From ea72e683bd89a26375f9a9f8e289b616b4202b89 Mon Sep 17 00:00:00 2001 From: Abhay Deshpande Date: Wed, 5 Jun 2024 20:27:08 -0700 Subject: [PATCH 7/8] Add comments --- tycho_demo/demo_interface.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tycho_demo/demo_interface.py b/tycho_demo/demo_interface.py index aacd0f4..5e013a7 100644 --- a/tycho_demo/demo_interface.py +++ b/tycho_demo/demo_interface.py @@ -58,8 +58,11 @@ def __init__(self, arm, gains_file): self.res_estimator = None # modes and hooks + # Invoked on the control thread to get the current command (position and velocity setpoints for each joint) self.modes: Dict[str, Callable[[State, float], tuple[np.ndarray, np.ndarray]]] = {} + # Invoked on the control thread when handling input self.handlers: Dict[str, Callable[[str, State], None]] = {} + # Invoked when shutting down the program self.onclose: List[Callable[[State], None]] = [] # The following hooks are called from the command thread before and after querying the mode callback self.pre_command_hooks: Dict[str, List[Callable[[State], None]]] = defaultdict(list) From 97858213a63e5f1e40cd5f0b94d4abb1f3edb9c7 Mon Sep 17 00:00:00 2001 From: Abhay Deshpande Date: Thu, 6 Jun 2024 18:05:52 -0700 Subject: [PATCH 8/8] Add comments --- tycho_demo/demo_interface.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tycho_demo/demo_interface.py b/tycho_demo/demo_interface.py index 5e013a7..d7bf2eb 100644 --- a/tycho_demo/demo_interface.py +++ b/tycho_demo/demo_interface.py @@ -420,8 +420,9 @@ def run_demo(callback_func=None, params=None, recorded_topics=[], cmd_freq=0): assert cmd_freq > 0, "Command frequency must be specified! (pass cmd_freq to run_demo())" state.counter_skip_freq = round(ROBOT_FEEDBACK_FREQUENCY / cmd_freq) + # Install default handlers BEFORE custom handlers + add_ros_subscribe_function(state, recorded_topics) # should be installed first add_snapping_function(state) - add_ros_subscribe_function(state, recorded_topics) add_logger_function(state) # Caller install custom handlers