Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Trajectory class #164

Merged
merged 8 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyrobosim/pyrobosim/navigation/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def execute(self, path, realtime_factor=1.0):
is_holding_object = self.robot.manipulated_object is not None
for i in range(traj_interp.num_points()):
start_time = time.time()
cur_pose = traj_interp.pose_at(i)
cur_pose = traj_interp.poses[i]
self.robot.set_pose(cur_pose)
if is_holding_object:
self.robot.manipulated_object.set_pose(cur_pose)
Expand Down
111 changes: 44 additions & 67 deletions pyrobosim/pyrobosim/utils/trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,24 @@ class Trajectory:

def __init__(
self,
t_pts=np.array([]),
x_pts=np.array([]),
y_pts=np.array([]),
yaw_pts=np.array([]),
t_pts=[],
poses=[],
):
"""
Creates a Trajectory object instance

:param t_pts: Array of time trajectory points, in seconds.
:type t_pts: :class:`numpy.array`
:param x_pts: Array of X translation trajectory points, in meters.
:type x_pts: :class:`numpy.array`
:param y_pts: Array of Y translation trajectory points, in meters.
:type y_pts: :class:`numpy.array`
:param yaw_pts: Array of yaw trajectory points, in radians.
:type yaw_pts: :class:`numpy.array`
Creates a Trajectory object instance.

:param t_pts: List of time trajectory points, in seconds.
:type t_pts: list[float]
:param poses: List of poses that make up the trajectory.
:type poses: list[:class:`pyrobosim.utils.pose.Pose`]
"""
num_pts = len(t_pts)
if len(x_pts) != num_pts or len(y_pts) != num_pts or len(yaw_pts) != num_pts:
raise ValueError("All point arrays must have the same number of elements.")
if len(t_pts) != len(poses):
raise ValueError(
"Time points and poses must have the same number of elements."
)

self.t_pts = np.array(t_pts)
self.x_pts = np.array(x_pts)
self.y_pts = np.array(y_pts)
self.yaw_pts = np.array(yaw_pts)
self.poses = np.array(poses)

def num_points(self):
"""
Expand All @@ -57,54 +50,33 @@ def is_empty(self):
"""
return self.num_points() == 0

def pose_at(self, idx):
"""
Retrieves the trajectory point at the specified index as a pose.

:return: A Pose object corresponding to the trajectory point at that index.
:rtype: `pyrobosim.utils.pose.Pose`
"""
if self.is_empty():
warnings.warn("Trajectory is empty. Cannot delete point.")
return None
elif idx < 0 or idx >= self.num_points():
warnings.warn(
f"Invalid index {idx} for trajectory length {self.num_points()}"
)
return None
else:
return Pose(x=self.x_pts[idx], y=self.y_pts[idx], yaw=self.yaw_pts[idx])

def delete(self, idx):
"""
Deletes a trajectory point at the specified index.

:param idx: The index
:type idx: int
sea-bass marked this conversation as resolved.
Show resolved Hide resolved
:return: True if the point was successfully deleted, otherwise False.
"""
if self.is_empty():
warnings.warn("Trajectory is empty. Cannot delete point.")
return
return False
elif idx < 0 or idx >= self.num_points():
warnings.warn(
f"Invalid index {idx} for trajectory length {self.num_points()}"
)
return
return False
else:
self.t_pts = np.delete(self.t_pts, idx)
self.x_pts = np.delete(self.x_pts, idx)
self.y_pts = np.delete(self.y_pts, idx)
self.yaw_pts = np.delete(self.yaw_pts, idx)
self.poses = np.delete(self.poses, idx)
return True


def get_constant_speed_trajectory(path, linear_velocity=0.2, max_angular_velocity=None):
"""
Gets a trajectory from a path (list of Pose objects) by calculating
time points based on constant velocity and maximum angular velocity.

The trajectory is returned as a tuple of numpy arrays
(t_pts, x_pts, y_pts, theta_pts).

:param path: Path object from which to compute a trajectory.
:type path: :class:`pyrobosim.utils.motion.Path`
:param linear_velocity: Constant linear velocity in m/s, defaults to 0.2.
Expand All @@ -113,7 +85,7 @@ def get_constant_speed_trajectory(path, linear_velocity=0.2, max_angular_velocit
defaults to None.
:type max_angular_velocity: float, optional
:return: Constant speed trajectory.
:rtype: tuple(:class:`pyrobosim.utils.trajectory.Trajectory`)
:rtype: :class:`pyrobosim.utils.trajectory.Trajectory`
"""
if path.num_poses < 2:
warnings.warn("Insufficient points to generate trajectory.")
Expand All @@ -133,11 +105,7 @@ def get_constant_speed_trajectory(path, linear_velocity=0.2, max_angular_velocit
ang_time = ang_distance / max_angular_velocity
t_pts[idx + 1] = t_pts[idx] + max(lin_time, ang_time)

# Package up the trajectory
x_pts = np.array([p.x for p in path.poses])
y_pts = np.array([p.y for p in path.poses])
yaw_pts = np.array([p.get_yaw() for p in path.poses])
return Trajectory(t_pts, x_pts, y_pts, yaw_pts)
return Trajectory(t_pts, path.poses)


def interpolate_trajectory(traj: Trajectory, dt: float):
Expand All @@ -159,32 +127,41 @@ def interpolate_trajectory(traj: Trajectory, dt: float):

# De-duplicate time points ensure that Slerp doesn't throw an error.
# Right now, we're just keeping the later point.
i = 0
i = 1
modified_traj = copy.deepcopy(traj)
while i < modified_traj.num_points():
if (i > 0) and (modified_traj.t_pts[i] <= modified_traj.t_pts[i - 1]):
if modified_traj.t_pts[i] <= modified_traj.t_pts[i - 1]:
warnings.warn("De-duplicated trajectory points at the same time.")
modified_traj.delete(i - 1)
else:
i += 1

# Set up Slerp interpolation for the angle.
t_final = modified_traj.t_pts[-1]
if t_final > 0:
euler_angs = [[0, 0, th] for th in modified_traj.yaw_pts]
slerp = Slerp(modified_traj.t_pts, Rotation.from_euler("xyz", euler_angs))

# Package up the interpolated trajectory
t_interp = np.arange(0, t_final, dt)

# Interpolate the translation elements linearly
if t_final not in t_interp:
t_interp = np.append(t_interp, t_final)
x_interp = np.interp(t_interp, modified_traj.t_pts, modified_traj.x_pts)
y_interp = np.interp(t_interp, modified_traj.t_pts, modified_traj.y_pts)
x_interp = np.interp(
t_interp, modified_traj.t_pts, [pose.x for pose in modified_traj.poses]
)
y_interp = np.interp(
t_interp, modified_traj.t_pts, [pose.y for pose in modified_traj.poses]
)
z_interp = np.interp(
t_interp, modified_traj.t_pts, [pose.z for pose in modified_traj.poses]
)

# Set up Slerp interpolation for the angle.
if t_final > 0:
yaw_interp = np.array(
[slerp(t).as_euler("xyz", degrees=False)[2] for t in t_interp]
)
euler_angs = [pose.eul for pose in modified_traj.poses]
slerp = Slerp(modified_traj.t_pts, Rotation.from_euler("xyz", euler_angs))
eul_interp = [slerp(t).as_euler("xyz", degrees=False) for t in t_interp]
else:
yaw_interp = np.array([modified_traj.yaw_pts[-1]])
eul_interp = [modified_traj.poses[-1].eul]

return Trajectory(t_interp, x_interp, y_interp, yaw_interp)
# Package up the interpolated trajectory
poses_interp = []
for x, y, z, eul in zip(x_interp, y_interp, z_interp, eul_interp):
poses_interp.append(Pose(x=x, y=y, z=z, roll=eul[0], pitch=eul[1], yaw=eul[2]))
sea-bass marked this conversation as resolved.
Show resolved Hide resolved
return Trajectory(t_interp, poses_interp)
131 changes: 46 additions & 85 deletions test/utils/test_trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,107 +21,77 @@ def test_create_empty_trajectory():


def test_create_trajectory():
traj = Trajectory(
np.array([0.0, 1.0, 2.0]), # Time points
np.array([0.1, 0.2, 0.3]), # X points
np.array([1.1, 1.2, 1.3]), # Y points
np.array([0.0, np.pi / 4, np.pi / 2]), # Yaw points
)
poses = [
Pose(x=0.1, y=1.1, yaw=0.0),
Pose(x=0.2, y=1.2, yaw=np.pi / 4),
Pose(x=0.3, y=1.3, yaw=np.pi / 2),
]
traj = Trajectory([0.0, 1.0, 2.0], poses)
assert traj.num_points() == 3
assert not traj.is_empty()

assert traj.pose_at(0).x == 0.1
assert traj.pose_at(0).y == 1.1
assert traj.pose_at(0).get_yaw() == 0.0

assert traj.pose_at(1).x == 0.2
assert traj.pose_at(1).y == 1.2
assert traj.pose_at(1).get_yaw() == np.pi / 4

assert traj.pose_at(2).x == 0.3
assert traj.pose_at(2).y == 1.3
assert traj.pose_at(2).get_yaw() == np.pi / 2

assert traj.poses[0].x == 0.1
assert traj.poses[0].y == 1.1
assert traj.poses[0].get_yaw() == 0.0

def test_pose_at_empty_trajectory():
traj = Trajectory()
with pytest.warns(UserWarning):
assert traj.pose_at(0) is None


def test_pose_at_invalid_indices():
traj = Trajectory(
np.array(
[
0.0,
1.0,
]
), # Time points
np.array([0.1, 0.2]), # X points
np.array([1.1, 1.2]), # Y points
np.array([0.0, np.pi / 4]), # Yaw points
)
assert traj.poses[1].x == 0.2
assert traj.poses[1].y == 1.2
assert traj.poses[1].get_yaw() == np.pi / 4

with pytest.warns(UserWarning):
assert traj.pose_at(-1) is None
assert traj.pose_at(5) is None
assert traj.poses[2].x == 0.3
assert traj.poses[2].y == 1.3
assert traj.poses[2].get_yaw() == np.pi / 2


def test_delete_empty_trajectory():
traj = Trajectory()
with pytest.warns(UserWarning):
traj.delete(0)
assert not traj.delete(0)


def test_delete_at_invalid_indices():
traj = Trajectory(
np.array(
[
0.0,
1.0,
]
), # Time points
np.array([0.1, 0.2]), # X points
np.array([1.1, 1.2]), # Y points
np.array([0.0, np.pi / 4]), # Yaw points
)
poses = [
Pose(x=0.1, y=1.1, yaw=0.0),
Pose(x=0.2, y=1.2, yaw=np.pi / 4),
Pose(x=0.3, y=1.3, yaw=np.pi / 2),
]
traj = Trajectory([0.0, 1.0, 2.0], poses)

with pytest.warns(UserWarning):
traj.delete(-1)
traj.delete(5)
assert not traj.delete(-1)
assert not traj.delete(5)

assert traj.num_points() == 2
assert traj.num_points() == 3


def test_delete():
traj = Trajectory(
np.array([0.0, 1.0, 2.0]), # Time points
np.array([0.1, 0.2, 0.3]), # X points
np.array([1.1, 1.2, 1.3]), # Y points
np.array([0.0, np.pi / 4, np.pi / 2]), # Yaw points
)
poses = [
Pose(x=0.1, y=1.1, yaw=0.0),
Pose(x=0.2, y=1.2, yaw=np.pi / 4),
Pose(x=0.3, y=1.3, yaw=np.pi / 2),
]
traj = Trajectory([0.0, 1.0, 2.0], poses)

traj.delete(1)

assert traj.num_points() == 2

assert traj.pose_at(0).x == 0.1
assert traj.pose_at(0).y == 1.1
assert traj.pose_at(0).get_yaw() == 0.0
assert traj.poses[0].x == 0.1
assert traj.poses[0].y == 1.1
assert traj.poses[0].get_yaw() == 0.0

assert traj.pose_at(1).x == 0.3
assert traj.pose_at(1).y == 1.3
assert traj.pose_at(1).get_yaw() == np.pi / 2
assert traj.poses[1].x == 0.3
assert traj.poses[1].y == 1.3
assert traj.poses[1].get_yaw() == np.pi / 2


def test_create_invalid_trajectory():
with pytest.raises(ValueError):
Trajectory(
np.array([0.0, 1.0]), # Time points
np.array([0.1, 0.2, 0.3]), # X points
np.array([1.1, 1.2, 1.3, 1.4]), # Y points
np.array([0.0, np.pi / 4, np.pi / 2]), # Yaw points
)
poses = [
Pose(x=0.1, y=1.1, yaw=0.0),
Pose(x=0.2, y=1.2, yaw=np.pi / 4),
Pose(x=0.3, y=1.3, yaw=np.pi / 2),
]
Trajectory([0.0, 10.0], poses)


def test_get_constant_speed_trajectory_empty_path():
Expand Down Expand Up @@ -149,9 +119,7 @@ def test_get_constant_speed_trajectory_unlimited_ang_vel():

assert traj.num_points() == 4
assert traj.t_pts == pytest.approx([0.0, 2.0, 4.0, 6.0], rel=1e-4)
assert traj.x_pts == pytest.approx([0.0, 1.0, 1.0, 0.0])
assert traj.y_pts == pytest.approx([0.0, 0.0, 1.0, 1.0])
assert traj.yaw_pts == pytest.approx([0.0, 0.0, np.pi / 2.0, -3.0 * np.pi / 4.0])
assert np.all(traj.poses == path.poses)


def test_get_constant_speed_trajectory_limited_ang_vel():
Expand All @@ -169,9 +137,7 @@ def test_get_constant_speed_trajectory_limited_ang_vel():

assert traj.num_points() == 4
assert traj.t_pts == pytest.approx([0.0, 2.0, 6.0, 12.0], rel=1e-4)
assert traj.x_pts == pytest.approx([0.0, 1.0, 1.0, 0.0])
assert traj.y_pts == pytest.approx([0.0, 0.0, 1.0, 1.0])
assert traj.yaw_pts == pytest.approx([0.0, 0.0, np.pi / 2.0, -3.0 * np.pi / 4.0])
assert np.all(traj.poses == path.poses)


def test_interpolate_trajectory():
Expand Down Expand Up @@ -209,12 +175,7 @@ def test_interpolate_trajectory_duplicate_points():


def test_interpolate_trajectory_insufficient_points():
traj = Trajectory(
(1.0,), # Time
(1.0,), # x points
(1.0,), # y points
(np.pi,), # yaw points
)
traj = Trajectory([1.0], [Pose()])
with pytest.warns(UserWarning):
interpolated_traj = interpolate_trajectory(traj, dt=0.1)
assert interpolated_traj is None
Loading