Source code for trifinger_simulation.trifinger_platform

import copy
import enum
import pickle
import numpy as np
import gym
from types import SimpleNamespace
import typing

from .tasks import move_cube, rearrange_dice
from .sim_finger import SimFinger, int_to_rgba
from . import camera, collision_objects, trifingerpro_limits


class ObjectType(enum.Enum):
    """Enumeration of supported object types."""

    NONE = 0
    COLORED_CUBE = 1
    DICE = 2


[docs] class ObjectPose: """A pure-python copy of trifinger_object_tracking::ObjectPose.""" __slots__ = ["position", "orientation", "confidence"] def __init__(self): #: array: Position (x, y, z) of the object. Units are meters. self.position = np.zeros(3) #: array: Orientation of the object as (x, y, z, w) quaternion. self.orientation = np.zeros(4) #: float: Estimate of the confidence for this pose observation. self.confidence = 1.0
[docs] class CameraObservation: """Pure-python copy of trifinger_cameras.camera.CameraObservation.""" __slots__ = ["image", "timestamp"] def __init__(self): #: array: The image. self.image = None #: float: Timestamp when the image was received. self.timestamp = None
[docs] class TriCameraObservation: """Python version of trifinger_cameras::TriCameraObservation. This is a pure-python implementation of trifinger_cameras::TriCameraObservation, so we don't need to depend on trifinger_cameras here. """ __slots__ = ["cameras"] def __init__(self): #: list of :class:`CameraObservation`: List of observations of cameras #: "camera60", "camera180" and "camera300" (in this order). self.cameras = [CameraObservation() for i in range(3)]
[docs] class TriCameraObjectObservation: """Python version of trifinger_object_tracking::TriCameraObjectObservation. This is a pure-python implementation of trifinger_object_tracking::TriCameraObjectObservation, so we don't need to depend on trifinger_object_tracking here. """ __slots__ = ["cameras", "object_pose", "filtered_object_pose"] def __init__(self): #: list of :class:`CameraObservation`: List of observations of cameras #: "camera60", "camera180" and "camera300" (in this order). self.cameras = [CameraObservation() for i in range(3)] #: ObjectPose: Pose of the object in world coordinates. self.object_pose = ObjectPose() #: ObjectPose: Filtered pose of the object in world coordinates. In #: simulation, this is the same as the unfiltered object_pose. self.filtered_object_pose = ObjectPose()
[docs] class TriFingerPlatform: """ Wrapper around the simulation providing the same interface as ``robot_interfaces::TriFingerPlatformFrontend``. The following methods of the robot_interfaces counterpart are not supported: - get_robot_status() - wait_until_timeindex() """ # Create the action and observation spaces # ======================================== spaces = SimpleNamespace() spaces.robot_torque = trifingerpro_limits.robot_torque spaces.robot_position = trifingerpro_limits.robot_position spaces.robot_velocity = trifingerpro_limits.robot_velocity spaces.object_position = trifingerpro_limits.object_position spaces.object_orientation = trifingerpro_limits.object_orientation # for convenience, we also create the respective gym spaces spaces.robot_torque.gym = gym.spaces.Box( low=spaces.robot_torque.low, high=spaces.robot_torque.high, dtype=np.float64, ) spaces.robot_position.gym = gym.spaces.Box( low=spaces.robot_position.low, high=spaces.robot_position.high, dtype=np.float64, ) spaces.robot_velocity.gym = gym.spaces.Box( low=spaces.robot_velocity.low, high=spaces.robot_velocity.high, dtype=np.float64, ) spaces.object_position.gym = gym.spaces.Box( low=spaces.object_position.low, high=spaces.object_position.high, dtype=np.float64, ) spaces.object_orientation.gym = gym.spaces.Box( low=spaces.object_orientation.low, high=spaces.object_orientation.high, dtype=np.float64, )
[docs] def __init__( self, visualization: bool = False, initial_robot_position: typing.Optional[typing.Sequence[float]] = None, initial_object_pose=None, enable_cameras: bool = False, time_step_s: float = 0.001, object_type: ObjectType = ObjectType.COLORED_CUBE, camera_delay_steps: int = 90, # default based on real robot data ): """Initialize. Args: visualization: Set to true to run visualization. initial_robot_position: Initial robot joint angles initial_object_pose: Only if ``object_type == COLORED_CUBE``: Initial pose for the manipulation object. Can be any object with attributes ``position`` (x, y, z) and ``orientation`` (x, y, z, w). If not set, a default pose is used. enable_cameras: Set to true to enable rendered images in the camera observations. If false camera observations are still available but the images will not be initialized. By default this is disabled as rendering of images takes a lot of computational power. Therefore the cameras should only be enabled if the images are actually used. time_step_s: Simulation time step duration in seconds. object_type: Which type of object to load. This also influences some other aspects: When using the cube, the camera observation will contain an attribute ``object_pose``. camera_delay_steps: Number of time steps by which camera observations are held back after they are generated. This is used to simulate the delay of the camera observation that is happening on the real system due to processing (mostly the object detection). """ #: Camera rate in frames per second. Observations of camera and #: object pose will only be updated with this rate. self.camera_rate_fps = 10 #: Set to true to render camera observations self.enable_cameras = enable_cameras #: Simulation time step self._time_step = time_step_s # Camera delay in robot time steps self._camera_delay_steps = camera_delay_steps # Time step at which the next camera update is triggered self._next_camera_trigger_t = 0 # Time step at which the last triggered camera update is ready (used to # simulate delay). self._next_camera_observation_ready_t: typing.Optional[int] = None # Initialize robot, object and cameras # ==================================== self.simfinger = SimFinger( finger_type="trifingerpro", time_step=self._time_step, enable_visualization=visualization, ) if initial_robot_position is None: initial_robot_position = self.spaces.robot_position.default self.simfinger.reset_finger_positions_and_velocities( initial_robot_position ) if initial_object_pose is None: initial_object_pose = move_cube.Pose( position=self.spaces.object_position.default, orientation=self.spaces.object_orientation.default, ) self._has_object_tracking = False self.object_type = object_type if object_type == ObjectType.COLORED_CUBE: self.cube = collision_objects.ColoredCubeV2( position=initial_object_pose.position, orientation=initial_object_pose.orientation, pybullet_client_id=self.simfinger._pybullet_client_id, ) self._has_object_tracking = True elif object_type == ObjectType.DICE: die_mass = 0.012 # use a random goal for initial positions initial_positions = rearrange_dice.sample_goal() self.dice = [ collision_objects.Cube( position=pos, half_width=rearrange_dice.DIE_WIDTH / 2, mass=die_mass, color_rgba=int_to_rgba(0x0A7DCF), ) for pos in initial_positions ] self.tricamera = camera.TriFingerCameras( pybullet_client_id=self.simfinger._pybullet_client_id ) # Forward some methods for convenience # ==================================== # forward "RobotFrontend" methods directly to simfinger self.Action = self.simfinger.Action self.get_desired_action = self.simfinger.get_desired_action self.get_applied_action = self.simfinger.get_applied_action self.get_timestamp_ms = self.simfinger.get_timestamp_ms self.get_current_timeindex = self.simfinger.get_current_timeindex self.get_robot_observation = self.simfinger.get_observation # forward kinematics directly to simfinger self.forward_kinematics = self.simfinger.kinematics.forward_kinematics # Initialize log # ============== self._action_log = { "initial_robot_position": copy.copy(initial_robot_position), "initial_object_pose": copy.copy(initial_object_pose), "actions": [], } # get initial camera observation self._delayed_camera_observation = ( self._get_current_camera_observation(0) ) self._camera_observation_t = self._delayed_camera_observation
def get_time_step(self): """Get simulation time step in seconds.""" return self._time_step def _compute_camera_update_step_interval(self) -> int: return round((1.0 / self.camera_rate_fps) / self._time_step)
[docs] def append_desired_action(self, action): """ Call :meth:`pybullet.SimFinger.append_desired_action` and add the action to the action log. Arguments/return value are the same as for :meth:`pybullet.SimFinger.append_desired_action`. """ camera_triggered = self._camera_update() t = self.simfinger.append_desired_action(action) # The correct timestamp can only be acquired now that t is given. # Update it accordingly in the camera observations if camera_triggered: camera_timestamp_s = self.get_timestamp_ms(t) / 1000 for camera_ in self._delayed_camera_observation.cameras: camera_.timestamp = camera_timestamp_s # write the desired action to the log camera_obs = self.get_camera_observation(t) robot_obs = self.get_robot_observation(t) log_entry = { "t": t, "action": action, "robot_observation": robot_obs, } if self._has_object_tracking: log_entry["object_pose"] = camera_obs.object_pose # make a deep copy of log_entry to ensure all reference ties are cut self._action_log["actions"].append(copy.deepcopy(log_entry)) return t
def _camera_update(self) -> bool: # update camera and object observations only with the rate of the # cameras next_t = self.simfinger._t + 1 # only trigger if no observation is still in work trigger_camera = (self._next_camera_observation_ready_t is None) and ( next_t >= self._next_camera_trigger_t ) if trigger_camera: self._delayed_camera_observation = ( self._get_current_camera_observation() ) self._next_camera_trigger_t += ( self._compute_camera_update_step_interval() ) self._next_camera_observation_ready_t = ( next_t + self._camera_delay_steps ) is_camera_observation_ready = ( self._next_camera_observation_ready_t is not None ) and (next_t >= self._next_camera_observation_ready_t) if is_camera_observation_ready: self._camera_observation_t = self._delayed_camera_observation self._next_camera_observation_ready_t = None return trigger_camera def _get_current_object_pose(self): assert self._has_object_tracking cube_state = self.cube.get_state() pose = ObjectPose() pose.position = np.asarray(cube_state[0]) pose.orientation = np.asarray(cube_state[1]) pose.confidence = 1.0 return pose def _get_current_camera_observation(self, t=None): if self.enable_cameras: images = self.tricamera.get_images() else: images = [None] * 3 if self._has_object_tracking: observation = TriCameraObjectObservation() else: observation = TriCameraObservation() # NOTE: The timestamp can only be set correctly after time step t # is actually reached. Therefore, this is set to None here and # filled with the proper value later. if t is None: timestamp = None else: timestamp = self.get_timestamp_ms(t) for i, image in enumerate(images): observation.cameras[i].image = image observation.cameras[i].timestamp = timestamp if self._has_object_tracking: observation.object_pose = self._get_current_object_pose() observation.filtered_object_pose = self._get_current_object_pose() return observation
[docs] def get_camera_observation( self, t: int ) -> typing.Union[TriCameraObservation, TriCameraObjectObservation]: """Get camera observation at time step t. .. important:: Actual images are only rendered if the class was created with `enable_cameras=True` in the constructor, otherwise the images in the observation are set to None. Other fields are still valid, though. Args: t: The time index of the step for which the observation is requested. Only the value returned by the last call of :meth:`~append_desired_action` is valid. Returns: Observations of the three cameras. Depending of the object type used, this may also contain the object pose. Raises: ValueError: If invalid time index ``t`` is passed. """ current_t = self.simfinger._t if t < 0: raise ValueError("Cannot access time index less than zero.") elif t == current_t or t == current_t + 1: return self._camera_observation_t else: raise ValueError( "Given time index t has to match with index of the current" " step or the next one." )
[docs] def store_action_log(self, filename): """Store the action log to a JSON file. Args: filename (str): Path to the JSON file to which the log shall be written. If the file exists already, it will be overwritten. """ t = self.get_current_timeindex() camera_obs = self.get_camera_observation(t) if self._has_object_tracking: self._action_log["final_object_pose"] = { "t": t, "pose": camera_obs.object_pose, } with open(filename, "wb") as fh: pickle.dump(self._action_log, fh)