diff --git a/robot_skills/src/robot_skills/arms.py b/robot_skills/src/robot_skills/arms.py index 882aaf4bcf..7ef5285344 100644 --- a/robot_skills/src/robot_skills/arms.py +++ b/robot_skills/src/robot_skills/arms.py @@ -1,4 +1,5 @@ import time +from typing import List # ROS import rospy @@ -17,6 +18,7 @@ from tue_msgs.msg import GripperCommand from robot_skills.robot_part import RobotPart +from robot_skills.utils.parameter_loading import load_param # Constants for arm requirements. Note that "don't care at all" is not here, as # it can be expressed by not imposing a requirement (set it to None). @@ -114,12 +116,14 @@ def send_joint_goal(self, configuration, timeout=5.0, max_joint_vel=0.7): # Joint trajectories def has_joint_trajectory(self, configuration): + # type: (str) -> bool """ Query whether the provided joint trajectory exists for the arm. """ return configuration in self._available_joint_trajectories - def send_joint_trajectory(self, configuration, timeout=5, max_joint_vel=0.7): + def send_joint_trajectory(self, configuration, timeout=5.0, max_joint_vel=0.7): + # type: (str, float, float) -> bool self._test_die(configuration in self._available_joint_trajectories, 'joint-goal ' + configuration, "Specify get_arm(..., required_trajectories=['{}'])".format(configuration)) return self._arm.send_joint_trajectory(configuration, timeout=timeout, @@ -318,6 +322,188 @@ class GripperState(object): OPEN = "open" CLOSE = "close" +class ArmPiece(object): + """ + Base class for arm pieces. + + :var robot_name: Name of the robot. + """ + def __init__(self, robot_name): + # type: (str) + self.robot_name = robot_name + + def load_param(self, param_name, default=None); + return load_param(self.robot_name, param_name, default) + +class ArmJointsPiece(ArmPiece): + """ + :var arm_joint_names: Names of all available arm joints. + :var torso_joint_names: Names of all available torso joints. + """ + def __init__(self, robot_name, arm_joints_suffix=""): + # type: (str, str) + """ + Constructor. + + :param robot_name: Name of the robot being used. + :param arm_joints_suffix: Suffix to append to names of arm joints. + """ + super(ArmJointsPiece, self).__init__(robot_name) + + joint_names = self.load_param('skills/arm/joint_names') + self.arm_joint_names = [name + arm_joints_suffix for name in joint_names] + self.torso_joint_names = self.load_param('skills/torso/joint_names') + + def get_arm_joint_names(self): + return self.arm_joint_names + + def get_torso_joint_names(self): + return self.torso_joint_names + +class HandoverPiece(object): + """ + Hand over an item between the arm and a human. + """ + def __init__(self, arm): + """ + :param arm: Arm to use for the exchange. + """ + self.robot_name = arm.robot_name + self.side = arm.side + + # Copy interface to the supplied arm. + arm.handover_to_human = self.handover_to_human + arm.handover_to_robot = self.handover_to_robot + + def handover_to_human(self, timeout=10): + # type: (float) -> bool + """ + Handover an item from the gripper to a human. + + Feels if user slightly pulls or pushes (the item in) the arm. On timeout, it will return False. + :param timeout: timeout in seconds + :return: True or False + """ + return self._exchange_with_human(False, timeout) + + def handover_to_robot(self, timeout=10): + # type: (float) -> bool + """ + Handover an item from a human to the robot. + + Feels if user slightly pushes an item in the gripper. On timeout, it will return False. + :param timeout: timeout in seconds + :return: True or False + """ + return self._exchange_with_human(True, timeout) + + def _exchange_with_human(self, to_robot, timeout=10.0): + # type: (bool, float) -> bool + """ + Exchange an item between the arm and a human. + + If 'to_robot', it feels if user slightly pushes an item in the gripper. + If not 'to_robot', it feels if user slightly pulls or pushes (the item in) the arm. + + :param to_robot: Direction of the exchange. + :param timeout: Timeout in seconds. + :return: Whether the exchange succeeded. + """ + if to_robot: + topic = '/{}/handoverdetector_{}/toggle_human2robot'.format(self.robot_name, self.side) + else: + topic = '/{}/handoverdetector_{}/toggle_robot2human'.format(self.robot_name, self.side) + + pub = rospy.Publisher(topic, std_msgs.msg.Bool, queue_size=1, latch=True) + pub.publish(std_msgs.msg.Bool(True)) + + result_topic = '/{}/handoverdetector_{}/result'.format(self.robot_name, self.side) + try: + rospy.wait_for_message(result_topic, std_msgs.msg.Bool, timeout) + return True + except rospy.ROSException as e: + rospy.logerr(e) + return False + +class ArmDatabasePiece(ArmPiece): + """ + High level knowledge about arm properties. + + :var goals: Default poses of the arm. + :var trajectories: Default trajectories of the arm. + """ + def __init__(self, robot_name): + # type: (str) + super(ArmDatabasePiece, self).__init__(robot_name) + + self.goals = self.load_param('skills/arm/default_configurations') + self.trajectories = self.load_param('skills/arm/default_trajectories') + + def get_goal(self, configuration): + # type: (str) + """ + Get an arm goal by name. + + :param configuration: Name of the configuration to retrieve. + :return: Configuration for the arm, or None if the requested configuration does not exist. + """ + return self.goals.get(configuration) + + def get_trajectory(self, configuration): + # type: (str) + """ + Get an arm trajectory by name. + + :param configuration: Name of the trajectory to retrieve. + :return: Trajectory for the arm, or None if the requested trajectory does not exist. + """ + return self.trajectories.get(configuration) + +class ArmMarkerPublisherPiece(ArmPiece): + """ + Piece for publishing a marker. + """ + def __init__(self, robot_name, side): + # type: (str, str) + """ + Constructor. + + :param robot_name: Name of the robot being used. + :param side: Name of the arm. + """ + super(ArmMarkerPublisherPiece).__init__(robot_name) + + topic = "/{}/{}_arm/grasp_target".format(robot_name, side) + self._marker_publisher = rospy.Publisher(topic, visualization_msgs.msg.Marker, queue_size=10) + + def publish_marker(self, goal, color, ns=""): + # type: (GraspPrecomputeGoal, List[float], str) -> None + """ + Publish markers for visualisation + + :param goal: tue_manipulation_msgs.msg.GraspPrecomputeGoal + :param color: Color of the marker (0.0-1.0) + :param ns: Namespace + """ + marker = visualization_msgs.msg.Marker() + marker.header.frame_id = goal.goal.header.frame_id + marker.header.stamp = rospy.Time.now() + marker.type = 2 + marker.pose.position.x = goal.goal.x + marker.pose.position.y = goal.goal.y + marker.pose.position.z = goal.goal.z + marker.lifetime = rospy.Duration(20.0) + marker.scale.x = 0.05 + marker.scale.y = 0.05 + marker.scale.z = 0.05 + marker.ns = ns + + marker.color.a = 1 + marker.color.r = color[0] + marker.color.g = color[1] + marker.color.b = color[2] + + self._marker_publisher.publish(marker) class Arm(RobotPart): """ @@ -331,8 +517,10 @@ class Arm(RobotPart): #To open left gripper >>> left.send_gripper_goal_open(10) # doctest: +SKIP + + :var arm_database: High level knowledge about arm properties. """ - def __init__(self, robot_name, tf_listener, get_joint_states, side): + def __init__(self, robot_name, tf_listener, get_joint_states, side, joints_piece=None): """ constructor @@ -340,6 +528,9 @@ def __init__(self, robot_name, tf_listener, get_joint_states, side): :param tf_listener: tf_server.TFClient() :param get_joint_states: get_joint_states function for getting the last joint states :param side: left or right + :vartype side: str + :param joints_piece If specified, arm part containing the arm and torso joint names. + :vartype joint: ArmJointsPiece """ super(Arm, self).__init__(robot_name=robot_name, tf_listener=tf_listener) self.side = side @@ -359,12 +550,14 @@ def __init__(self, robot_name, tf_listener, get_joint_states, side): go = self.load_param('skills/arm/' + self.side + '/base_offset') self._base_offset = kdl.Vector(go["x"], go["y"], go["z"]) - self.joint_names = self.load_param('skills/arm/joint_names') - self.joint_names = [name + "_" + self.side for name in self.joint_names] - self.torso_joint_names = self.load_param('skills/torso/joint_names') + if joints_piece: + self._joints_part = joints_piece + else: + self._joints_part = ArmJointsPiece(self.robot_name, suffix="_" + side) - self.default_configurations = self.load_param('skills/arm/default_configurations') - self.default_trajectories = self.load_param('skills/arm/default_trajectories') + self.arm_database = ArmDatabasePiece(self.robot_name) + self._marker_publisher = ArmMarkerPublisherPiece(self.robot_name, self.side) + self._handover = HandoverPiece(self) # listen to the hardware status to determine if the arm is available self.subscribe_hardware_status(self.side + '_arm') @@ -386,11 +579,6 @@ def __init__(self, robot_name, tf_listener, get_joint_states, side): rospy.Subscriber("/" + self.robot_name + "/" + self.side + "_arm/proximity_sensor", std_msgs.msg.Float32MultiArray, self._grasp_sensor_callback) - # Init marker publisher - self._marker_publisher = rospy.Publisher( - "/" + robot_name + "/" + self.side + "_arm/grasp_target", - visualization_msgs.msg.Marker, queue_size=10) - self.get_joint_states = get_joint_states def collect_gripper_types(self, gripper_type): @@ -434,7 +622,7 @@ def has_joint_goal(self, configuration): :param configuration: name of jint goal to check. :return: Whether the joint goal is available. """ - return configuration in self.default_configurations + return self.arm_database.get_goal(configuration) is not None def has_joint_trajectory(self, configuration): """ @@ -443,7 +631,7 @@ def has_joint_trajectory(self, configuration): :param configuration: name of jint trajectory to check. :return: Whether the joint trajectory is available. """ - return configuration in self.default_trajectories + return self.arm_database.get_trajectory(configuration) is not None def cancel_goals(self): """ @@ -501,41 +689,18 @@ def send_goal(self, frameStamped, timeout=30, pre_grasp=False, first_joint_pos_o # TODO: Get rid of this custom message type # Create goal: - grasp_precompute_goal = GraspPrecomputeGoal() - grasp_precompute_goal.goal.header.frame_id = frame_in_baselink.frame_id - grasp_precompute_goal.goal.header.stamp = rospy.Time.now() - - grasp_precompute_goal.PERFORM_PRE_GRASP = pre_grasp - grasp_precompute_goal.FIRST_JOINT_POS_ONLY = first_joint_pos_only - - grasp_precompute_goal.allowed_touch_objects = allowed_touch_objects - - grasp_precompute_goal.goal.x = frame_in_baselink.frame.p.x() - grasp_precompute_goal.goal.y = frame_in_baselink.frame.p.y() - grasp_precompute_goal.goal.z = frame_in_baselink.frame.p.z() - - roll, pitch, yaw = frame_in_baselink.frame.M.GetRPY() - grasp_precompute_goal.goal.roll = roll - grasp_precompute_goal.goal.pitch = pitch - grasp_precompute_goal.goal.yaw = yaw - - self._publish_marker(grasp_precompute_goal, [1, 0, 0], "grasp_point") + grasp_goal = make_grasp_precompute_goal(frame_in_baselink, pre_grasp, first_joint_pos_only, + allowed_touch_objects) + self._marker_publisher.publish_marker(grasp_goal, [1, 0, 0], "grasp_point") # Add tunable parameters offset_frame = frame_in_baselink.frame * self.offset + set_grasp_goal_position(grasp_goal, offset_frame.p) + set_grasp_goal_orientation(grasp_goal, frame_in_baselink.frame.M) - grasp_precompute_goal.goal.x = offset_frame.p.x() - grasp_precompute_goal.goal.y = offset_frame.p.y() - grasp_precompute_goal.goal.z = offset_frame.p.z() - - roll, pitch, yaw = frame_in_baselink.frame.M.GetRPY() - grasp_precompute_goal.goal.roll = roll - grasp_precompute_goal.goal.pitch = pitch - grasp_precompute_goal.goal.yaw = yaw - - # rospy.loginfo("Arm goal: {0}".format(grasp_precompute_goal)) + # rospy.loginfo("Arm goal: {0}".format(grasp_goal)) - self._publish_marker(grasp_precompute_goal, [0, 1, 0], "grasp_point_corrected") + self._marker_publisher.publish_marker(grasp_goal, [0, 1, 0], "grasp_point_corrected") time.sleep(0.001) # This is necessary: the rtt_actionlib in the hardware seems # to only have a queue size of 1 and runs at 1000 hz. This @@ -548,20 +713,20 @@ def send_goal(self, frameStamped, timeout=30, pre_grasp=False, first_joint_pos_o # Send goal: if timeout == 0.0: - self._ac_grasp_precompute.send_goal(grasp_precompute_goal) + self._ac_grasp_precompute.send_goal(grasp_goal) return True else: result = self._ac_grasp_precompute.send_goal_and_wait( - grasp_precompute_goal, + grasp_goal, execute_timeout=rospy.Duration(timeout)) if result == GoalStatus.SUCCEEDED: result_pose = self.tf_listener.lookupTransform(self.robot_name + "/base_link", self.robot_name + "/grippoint_{}".format(self.side), rospy.Time(0)) - dx = grasp_precompute_goal.goal.x - result_pose[0][0] - dy = grasp_precompute_goal.goal.y - result_pose[0][1] - dz = grasp_precompute_goal.goal.z - result_pose[0][2] + dx = grasp_goal.goal.x - result_pose[0][0] + dy = grasp_goal.goal.y - result_pose[0][1] + dz = grasp_goal.goal.z - result_pose[0][2] if abs(dx) > 0.005 or abs(dy) > 0.005 or abs(dz) > 0.005: rospy.logwarn("Grasp-precompute error too large: [{}, {}, {}]".format( @@ -573,6 +738,7 @@ def send_goal(self, frameStamped, timeout=30, pre_grasp=False, first_joint_pos_o return False def send_joint_goal(self, configuration, timeout=5.0, max_joint_vel=0.7): + # type: (str, float, float) -> bool """ Send a named joint goal (pose) defined in the parameter default_configurations to the arm :param configuration:(str) name of configuration, configuration should be loaded as parameter @@ -580,8 +746,9 @@ def send_joint_goal(self, configuration, timeout=5.0, max_joint_vel=0.7): :param max_joint_vel:(int,float,[int]) speed the robot can have when getting to the desired configuration :return: True or False, False in case of nonexistent configuration or failed execution """ - if configuration in self.default_configurations: - return self._send_joint_trajectory([self.default_configurations[configuration]], + arm_configuration = self.arm_database.get_goal(configuration) + if arm_configuration: + return self._send_joint_trajectory(arm_configuration, timeout=rospy.Duration.from_sec(timeout), max_joint_vel=max_joint_vel) else: @@ -589,6 +756,7 @@ def send_joint_goal(self, configuration, timeout=5.0, max_joint_vel=0.7): return False def send_joint_trajectory(self, configuration, timeout=5.0, max_joint_vel=0.7): + # type: (str, float, float) -> bool """ Send a named joint trajectory (sequence of poses) defined in the default_trajectories to the arm @@ -597,8 +765,9 @@ def send_joint_trajectory(self, configuration, timeout=5.0, max_joint_vel=0.7): :param max_joint_vel:(int,float,[int]) speed the robot can have when getting to the desired configuration :return: True or False, False in case of nonexistent configuration or failed execution """ - if configuration in self.default_trajectories: - return self._send_joint_trajectory(self.default_trajectories[configuration], + arm_trajectory = self.arm_database.get_trajectory(configuration) + if arm_trajectory: + return self._send_joint_trajectory(arm_trajectory, timeout=rospy.Duration.from_sec(timeout), max_joint_vel=max_joint_vel) else: @@ -672,48 +841,6 @@ def send_gripper_goal(self, state, timeout=5.0, max_torque=0.1): return goal_status == GoalStatus.SUCCEEDED - def handover_to_human(self, timeout=10): - """ - Handover an item from the gripper to a human. - - Feels if user slightly pulls or pushes (the item in) the arm. On timeout, it will return False. - :param timeout: timeout in seconds - :return: True or False - """ - pub = rospy.Publisher('/'+self.robot_name+'/handoverdetector_'+self.side+'/toggle_robot2human', - std_msgs.msg.Bool, queue_size=1, latch=True) - pub.publish(std_msgs.msg.Bool(True)) - - try: - rospy.wait_for_message('/'+self.robot_name+'/handoverdetector_'+self.side+'/result', std_msgs.msg.Bool, - timeout) - # print('/'+self.robot_name+'/handoverdetector_'+self.side+'/result') - return True - except rospy.ROSException as e: - rospy.logerr(e) - return False - - def handover_to_robot(self, timeout=10): - """ - Handover an item from a human to the robot. - - Feels if user slightly pushes an item in the gripper. On timeout, it will return False. - :param timeout: timeout in seconds - :return: True or False - """ - pub = rospy.Publisher('/'+self.robot_name+'/handoverdetector_'+self.side+'/toggle_human2robot', - std_msgs.msg.Bool, queue_size=1, latch=True) - pub.publish(std_msgs.msg.Bool(True)) - - try: - rospy.wait_for_message('/'+self.robot_name+'/handoverdetector_'+self.side+'/result', std_msgs.msg.Bool, - timeout) - # print('/'+self.robot_name+'/handoverdetector_'+self.side+'/result') - return True - except rospy.ROSException as e: - rospy.logerr(e) - return False - def _send_joint_trajectory(self, joints_references, max_joint_vel=0.7, timeout=rospy.Duration(5)): """ Low level method that sends a array of joint references to the arm. @@ -734,10 +861,12 @@ def _send_joint_trajectory(self, joints_references, max_joint_vel=0.7, timeout=r if not joints_references: return False - if len(joints_references[0]) == len(self.joint_names) + len(self.torso_joint_names): - joint_names = self.torso_joint_names + self.joint_names + arm_joint_names = self._joints_part.get_arm_joint_names() + torso_joint_names = self._joints_part.get_torso_joint_names() + if len(joints_references[0]) == len(arm_joint_names) + len(torso_joint_names): + joint_names = torso_joint_names + arm_joint_names else: - joint_names = self.joint_names + joint_names = arm_joint_names if isinstance(max_joint_vel, (float, int)): max_joint_vel = [max_joint_vel]*len(joint_names) @@ -763,8 +892,7 @@ def _send_joint_trajectory(self, joints_references, max_joint_vel=0.7, timeout=r positions=joints_reference, time_from_start=rospy.Duration.from_sec(time_from_start))) - joint_trajectory = JointTrajectory(joint_names=joint_names, - points=ps) + joint_trajectory = JointTrajectory(joint_names=joint_names, points=ps) goal = FollowJointTrajectoryGoal(trajectory=joint_trajectory, goal_time_tolerance=timeout) rospy.logdebug("Send {0} arm to jointcoords \n{1}".format(self.side, ps)) @@ -861,40 +989,62 @@ def base_offset(self): """ return self._base_offset - def _publish_marker(self, goal, color, ns=""): - """ - Publish markers for visualisation - :param goal: tue_manipulation_msgs.msg.GraspPrecomputeGoal - :param color: list of rgb colors (0.0-1.0) - :param ns: namespace - :return: no return - """ - marker = visualization_msgs.msg.Marker() - marker.header.frame_id = goal.goal.header.frame_id - marker.header.stamp = rospy.Time.now() - marker.type = 2 - marker.pose.position.x = goal.goal.x - marker.pose.position.y = goal.goal.y - marker.pose.position.z = goal.goal.z - marker.lifetime = rospy.Duration(20.0) - marker.scale.x = 0.05 - marker.scale.y = 0.05 - marker.scale.z = 0.05 - marker.ns = ns + def __repr__(self): + return "Arm(side='{side}')".format(side=self.side) - marker.color.a = 1 - marker.color.r = color[0] - marker.color.g = color[1] - marker.color.b = color[2] +def make_grasp_precompute_goal(frame_in_baselink, pre_grasp, first_joint_pos_only, + allowed_touch_objects): + """ + Construct a grasp precompute goal. + + :param frame_in_baselink: + :param first_joint_pos_only: Bool to only execute first joint position of + whole trajectory. + :param allowed_touch_objects: List of object names in the worldmodel, which + are allowed to be touched. + :return: The constructed precompute goal. + """ + # TODO: Get rid of this custom message type + # Create goal: + grasp_goal = GraspPrecomputeGoal() + grasp_goal.goal.header.frame_id = frame_in_baselink.frame_id + grasp_goal.goal.header.stamp = rospy.Time.now() - self._marker_publisher.publish(marker) + grasp_goal.PERFORM_PRE_GRASP = pre_grasp + grasp_goal.FIRST_JOINT_POS_ONLY = first_joint_pos_only - def __repr__(self): - return "Arm(side='{side}')".format(side=self.side) + grasp_goal.allowed_touch_objects = allowed_touch_objects + + set_grasp_goal_position(grasp_goal, frame_in_baselink.frame.p) + set_grasp_goal_orientation(grasp_goal, frame_in_baselink.frame.M) + return grasp_goal + +def set_grasp_goal_position(grasp_goal, pos): + """ + Set the position of the grasp goal. + + :param grasp_goal: Grasp goal to change, modified in-place. + :param pos: Position to set. + """ + grasp_goal.goal.x = pos.x() + grasp_goal.goal.y = pos.y() + grasp_goal.goal.z = pos.z() + +def set_grasp_goal_orientation(grasp_goal, ori): + """ + Set the orientation of the grasp goal. + + :param grasp_goal: Grasp goal to change, modified in-place. + :param ori: Orientation to set. + """ + roll, pitch, yaw = ori.GetRPY() + grasp_goal.goal.roll = roll + grasp_goal.goal.pitch = pitch + grasp_goal.goal.yaw = yaw class ForceSensingArm(Arm): - def __init__(self, robot_name, tf_listener, get_joint_states, side): + def __init__(self, robot_name, tf_listener, get_joint_states, side, joints_piece=None): """ constructor @@ -904,8 +1054,10 @@ def __init__(self, robot_name, tf_listener, get_joint_states, side): :param tf_listener: tf_server.TFClient() :param get_joint_states: get_joint_states function for getting the last joint states :param side: left or right + :param joints_piece: If specified, arm part containing the arm and torso joint names. + """ - super(ForceSensingArm, self).__init__(robot_name, tf_listener, get_joint_states, side) + super(ForceSensingArm, self).__init__(robot_name, tf_listener, get_joint_states, side, joints_piece) self.force_sensor = ForceSensor(robot_name, tf_listener, "/" + robot_name + "/wrist_wrench/raw") @@ -931,16 +1083,17 @@ def move_down_until_force_sensor_edge_up(self, timeout=10, retract_distance=0.01 self._ac_joint_traj.send_goal(self._make_goal(current_joint_state, 0.5)) def _make_goal(self, current_joint_state, timeout): - positions = [current_joint_state[n] for n in self.joint_names] + arm_joint_names = self._joints_part.get_arm_joint_names() + positions = [current_joint_state[n] for n in arm_joint_names] points = [JointTrajectoryPoint(positions=positions, time_from_start=rospy.Duration.from_sec(timeout))] - trajectory = JointTrajectory(joint_names=self.joint_names, points=points) + trajectory = JointTrajectory(joint_names=arm_joint_names, points=points) return FollowJointTrajectoryGoal(trajectory=trajectory) class FakeArm(RobotPart): - def __init__(self, robot_name, tf_listener, side): - super(FakeArm, self).__init__(robot_name=robot_name, tf_listener=tf_listener) + def __init__(self, robot_name, tf_listener, side, joints_piece=None): + super(FakeArm, self).__init__(robot_name=robot_name, tf_listener=tf_listener, joints_piece) self.side = side if (self.side is "left") or (self.side is "right"): pass @@ -956,12 +1109,12 @@ def __init__(self, robot_name, tf_listener, side): self.marker_to_grippoint_offset = self.load_param('skills/arm/offset/marker_to_grippoint') - self.joint_names = self.load_param('skills/arm/joint_names') - self.joint_names = [name + "_" + self.side for name in self.joint_names] - self.torso_joint_names = self.load_param('skills/torso/joint_names') + if joints_piece: + self._joints_part = joints_piece + else: + self._joints_part = ArmJointsPiece(self.robot_name, suffix="_" + side) - self.default_configurations = self.load_param('skills/arm/default_configurations') - self.default_trajectories = self.load_param('skills/arm/default_trajectories') + self.arm_database = ArmDatabasePiece(self.robot_name) @property def operational(self): diff --git a/robot_skills/src/robot_skills/hero.py b/robot_skills/src/robot_skills/hero.py index 3b941ce0bf..d8880b0658 100644 --- a/robot_skills/src/robot_skills/hero.py +++ b/robot_skills/src/robot_skills/hero.py @@ -14,9 +14,10 @@ def __init__(self, wait_services=False): self.add_body_part('base', base.Base(self.robot_name, self.tf_listener)) self.add_body_part('torso', torso.Torso(self.robot_name, self.tf_listener, self.get_joint_states)) + joints_piece = arms.ArmJointsPiece(self.robot_name) self.add_arm_part( 'leftArm', - arms.ForceSensingArm(self.robot_name, self.tf_listener, self.get_joint_states, "left") + arms.ForceSensingArm(self.robot_name, self.tf_listener, self.get_joint_states, "left", joints_piece=joints_piece) ) self.add_body_part('head', head.Head(self.robot_name, self.tf_listener)) @@ -44,9 +45,6 @@ def __init__(self, wait_services=False): # Reasoning/world modeling self.add_body_part('ed', world_model_ed.ED(self.robot_name, self.tf_listener)) - #rename joint names - self.parts['leftArm'].joint_names = self.parts['leftArm'].load_param('skills/arm/joint_names') - # These don't work for HSR because (then) Toyota's diagnostics aggregator makes the robot go into error somehow for part in self.parts.itervalues(): part.unsubscribe_hardware_status() diff --git a/robot_skills/src/robot_skills/robot_part.py b/robot_skills/src/robot_skills/robot_part.py index 8ade3e1581..17b48aab1d 100644 --- a/robot_skills/src/robot_skills/robot_part.py +++ b/robot_skills/src/robot_skills/robot_part.py @@ -7,6 +7,8 @@ import rospy import actionlib +from robot_skills.utils.parameter_loading import load_param + # Determine simulation mode SIM_MODE = os.environ.get("ROBOT_REAL", "false").lower() != "true" @@ -40,10 +42,7 @@ def load_param(self, param_name, default=None): :param default: default value for when parameter unavailable :return: loaded parameters """ - if default is None: - return rospy.get_param('/' + self.robot_name + '/' + param_name) - else: - return rospy.get_param('/' + self.robot_name + '/' + param_name, default) + return load_param(self.robot_name, param_name, default) def wait_for_connections(self, timeout, log_failing_connections=True): """ diff --git a/robot_skills/src/robot_skills/util/parameter_loading.py b/robot_skills/src/robot_skills/util/parameter_loading.py new file mode 100644 index 0000000000..420491dd3e --- /dev/null +++ b/robot_skills/src/robot_skills/util/parameter_loading.py @@ -0,0 +1,16 @@ +# ROS +import rospy + +def load_param(robot_name: str, param_name: str, default=None): + """ + Loads a parameter from the parameter server, namespaced by robot name + + :param robot_name: Name of the robot being used. + :param param_name: Parameter name. + :param default: Default value if parameter is unavailable. + :return: Parameter value (loaded or default). + """ + if default is None: + return rospy.get_param('/' + robot_name + '/' + param_name) + else: + return rospy.get_param('/' + robot_name + '/' + param_name, default)