Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replayer changes #12

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion fogros2-rt-x/fogros2_rt_x/dataset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ def dataset2path(dataset_name):

def load_rlds_dataset(dataset_name="kuka"):
b = tfds.builder_from_directory(builder_dir=dataset2path(dataset_name))
ds = b.as_dataset(split="train[:10]")
if dataset_name in ["nyu_rot_dataset_converted_externally_to_rlds", "utokyo_saytap_converted_externally_to_rlds"]:
ds = b.as_dataset(split="train[:100%]")
else:
ds = b.as_dataset(split="train[:30]")
return ds


Expand Down
65 changes: 65 additions & 0 deletions fogros2-rt-x/fogros2_rt_x/recordall.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/bin/bash

# List of names
EXISTING_DATASETS=(
"fractal20220817_data"
"kuka"
"bridge"
"taco_play"
"jaco_play"
"berkeley_cable_routing"
"roboturk" #ERROR!
"nyu_door_opening_surprising_effectiveness"
"viola"
"berkeley_autolab_ur5"
"toto"
"language_table"
"columbia_cairlab_pusht_real"
"stanford_kuka_multimodal_dataset_converted_externally_to_rlds"
"nyu_rot_dataset_converted_externally_to_rlds" #ERROR!
"stanford_hydra_dataset_converted_externally_to_rlds"
"austin_buds_dataset_converted_externally_to_rlds"
"nyu_franka_play_dataset_converted_externally_to_rlds"
"maniskill_dataset_converted_externally_to_rlds"
"cmu_franka_exploration_dataset_converted_externally_to_rlds"
"ucsd_kitchen_dataset_converted_externally_to_rlds"
"ucsd_pick_and_place_dataset_converted_externally_to_rlds"
"austin_sailor_dataset_converted_externally_to_rlds"
"austin_sirius_dataset_converted_externally_to_rlds"
"bc_z"
"usc_cloth_sim_converted_externally_to_rlds"
"utokyo_pr2_opening_fridge_converted_externally_to_rlds"
"utokyo_pr2_tabletop_manipulation_converted_externally_to_rlds"
"utokyo_saytap_converted_externally_to_rlds"
"utokyo_xarm_pick_and_place_converted_externally_to_rlds"
"utokyo_xarm_bimanual_converted_externally_to_rlds"
"robo_net"
"berkeley_mvp_converted_externally_to_rlds"
"berkeley_rpt_converted_externally_to_rlds"
"kaist_nonprehensile_converted_externally_to_rlds"
"stanford_mask_vit_converted_externally_to_rlds"
"tokyo_u_lsmo_converted_externally_to_rlds"
"dlr_sara_pour_converted_externally_to_rlds"
"dlr_sara_grid_clamp_converted_externally_to_rlds"
"dlr_edan_shared_control_converted_externally_to_rlds"
"asu_table_top_converted_externally_to_rlds"
"stanford_robocook_converted_externally_to_rlds"
"eth_agent_affordances"
"imperialcollege_sawyer_wrist_cam"
"iamlab_cmu_pickup_insert_converted_externally_to_rlds"
"uiuc_d3field"
"utaustin_mutex"
"berkeley_fanuc_manipulation"
"cmu_play_fusion"
"cmu_stretch"
"berkeley_gnm_recon"
"berkeley_gnm_cory_hall"
"berkeley_gnm_sac_son"
)

# Loop through each dataset
for dataset in "${EXISTING_DATASETS[@]}"
do
# Replay and record the current dataset
ros2 launch fogros2_rt_x replayer.launch.py dataset_name:="$dataset"
done
75 changes: 41 additions & 34 deletions fogros2-rt-x/fogros2_rt_x/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,10 @@
from rosbag2_py import Recorder, RecordOptions, StorageOptions
from rclpy.node import Node
import time
from .replayer_common import episode_recorder
from std_srvs.srv import Empty
from threading import Thread
import sys

def init_recorder():
global episode_recorder
storage_options = StorageOptions(
uri=f"rosbags/episode_{1}",
storage_id="sqlite3"
)
record_options = RecordOptions()
record_options.all = True
episode_recorder.record(storage_options, record_options)


def start_new_recorder(episode_counter):
global episode_recorder
print("CURRENT EPISODE IS:", episode_counter)
storage_options = StorageOptions(
uri=f"rosbags/episode_{episode_counter}",
storage_id="sqlite3"
)
record_options = RecordOptions()
record_options.all = True
# episode_recorder = Recorder()
episode_recorder.record(storage_options, record_options)

def restart_recorder():
global episode_recorder
print("ATTR:", dir(episode_recorder))
episode_recorder.cancel()
episode_recorder.stop()


class DatasetRecorder(Node):
"""
Expand All @@ -52,13 +25,47 @@ class DatasetRecorder(Node):
"""
def __init__(self):
super().__init__("fogros2_rt_x_recorder")
self.declare_parameter("dataset_name", "nyu_rot_dataset_converted_externally_to_rlds")
self.dataset_name = self.get_parameter("dataset_name").value
self.new_episode_notification_service = self.create_service(Empty, 'new_episode_notification_service', self.new_episode_notification_service_callback)
self.end_dataset_notification_service = self.create_service(Empty, 'end_dataset_notification_service', self.end_dataset_notification_service_callback)
self.episode_recorder = Recorder()

self.logger = self.get_logger()
self.episode_counter = 1
init_recorder()
while True:
self.episode_counter += 1
start_new_recorder(self.episode_counter)
self.dataset_counter = 1
self.init_recorder()
self.logger.info("Recording started")

def end_dataset_notification_service_callback(self, request, response):
self.logger.info("Received request to end recording")
self.stop_recorder()
sys.exit()
return response

def new_episode_notification_service_callback(self, request, response):
self.logger.info("Received request to start new episode")
self.stop_recorder()
self.episode_counter += 1
self.start_new_recorder()
return response

def init_recorder(self):
self.start_new_recorder()

def start_new_recorder(self):
self.logger.info(f"starting episode #: {self.episode_counter}")
storage_options = StorageOptions(
uri=f"rosbags/{self.dataset_name}/episode_{self.episode_counter}",
storage_id="sqlite3"
)
record_options = RecordOptions()
record_options.all = True
self.thread = Thread(target=self.episode_recorder.record, args=(storage_options, record_options)).start()

def stop_recorder(self):
self.episode_recorder.cancel()

def main(args=None):

rclpy.init(args=args)
Expand Down
78 changes: 54 additions & 24 deletions fogros2-rt-x/fogros2_rt_x/replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
import rosbag2_py as rosbag
from rosbag2_py import Recorder
from rclpy.node import Node
from .recorder import restart_recorder
from .dataset_utils import *
from .dataset_spec import DatasetFeatureSpec, FeatureSpec
from .dataset_spec import tf_feature_definition_to_ros_msg_class_str
from .plugins.conf_base import *
from .conf_base import *
import time
import tensorflow_datasets as tfds
from std_srvs.srv import Empty
import sys

class DatasetReplayer(Node):
"""
Expand All @@ -62,11 +63,18 @@ class DatasetReplayer(Node):

def __init__(self):
super().__init__("fogros2_rt_x_replayer")
self.logger = self.get_logger()
# create an empty ros2 servcice to start and stop recording and one to start recording new dataset
self.new_episode_notification_client = self.create_client(Empty, 'new_episode_notification_service')
self.new_episode_notification_req = Empty.Request()
self.end_dataset_notification_client = self.create_client(Empty, 'end_dataset_notification_service')
self.end_dataset_notification_req = Empty.Request()

self.declare_parameter("dataset_name", "berkeley_fanuc_manipulation")
dataset_name = self.get_parameter("dataset_name").value
self.episode_limit = 25

self.declare_parameter("per_episode_interval", 5) # second
self.declare_parameter("per_episode_interval", 10) # second
self.per_episode_interval = self.get_parameter("per_episode_interval").value
self.declare_parameter("per_step_interval", 0.2) # second
self.per_step_interval = self.get_parameter("per_step_interval").value
Expand All @@ -76,18 +84,25 @@ def __init__(self):
) # as_single_topic | as_separate_topics | both
replay_type = self.get_parameter("replay_type").value

self.episode_counter = 1
self.dataset = load_rlds_dataset(dataset_name)
self.logger = self.get_logger()
self.dataset_info = get_dataset_info([dataset_name])
self.logger.info("Loading Dataset " + str(self.dataset_info))

self.episode = next(iter(self.dataset))
# self.logger.info(f"Total episodes {len(self.dataset)}") #DEBUG!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
self.dataset_iter = iter(self.dataset)
self.episode = next(self.dataset_iter)
self.dataset_features = self.dataset_info[0][1].features
self.step_features = self.dataset_features["steps"]
self.topics = list()
self.episode_counter = 1
self.init_topics_from_features(self.step_features)


# while True: # DEBUG!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# self.logger.info(f"Episode {self.episode_counter}")
# try:
# self.episode = next(self.dataset_iter)
# self.episode_counter += 1
# except StopIteration:
# return

if replay_type == "as_separate_topics":
self.topic_name_to_publisher_dict = dict()
Expand All @@ -98,36 +113,37 @@ def __init__(self):
"Invalid replay_type: "
+ str(replay_type)
+ ". Must be one of: as_separate_topics, as_single_topic."
)
)

def init_topics_from_features(self, features):
for name, tf_feature in features.items():
# self.logger.info(f"converting {name} and type {tf_feature}")
if isinstance(tf_feature, tfds.features.FeaturesDict):
self.init_topics_from_features(tf_feature)
else:
if tf_feature.shape == () and tf_feature.dtype.is_bool:
if tf_feature.shape == () and tf_feature.tf_dtype.is_bool:
self.topics.append(FeatureSpec(name, Scalar(dtype=tf.bool)))
elif tf_feature.shape == () and tf_feature.tf_dtype is tf.string:
self.topics.append(FeatureSpec(name, Text()))
else:
self.topics.append(FeatureSpec(name, tf_feature))





def init_publisher_separate_topics(self):
for topic in self.topics:
publisher = self.create_publisher(
topic.ros_type, topic.ros_topic_name, 10
)
self.topic_name_to_publisher_dict[topic.ros_topic_name] = publisher

self.create_timer(
self.timer = self.create_timer(
self.per_episode_interval, self.timer_callback_separate_topics
)

def timer_callback_separate_topics(self):
for step in self.episode["steps"]:
for topic in self.topics:
# self.logger.info(
# f"Publishing data {step[topic.tf_name]} on topic {topic.ros_topic_name}"
# )
if topic.tf_name in step:
# Fetch from step data
msg = topic.convert_tf_tensor_data_to_ros2_msg(
Expand Down Expand Up @@ -157,18 +173,32 @@ def timer_callback_separate_topics(self):

self.check_last_step_update_recorder(step)
time.sleep(self.per_step_interval)

self.episode = next(iter(self.dataset))
self.create_timer(
self.per_episode_interval, self.timer_callback_separate_topics
)
try:
self.episode = next(self.dataset_iter)
except StopIteration:
self.end_of_dataset_sequence()

def end_of_dataset_sequence(self):
self.logger.info(f"End of the current dataset or exceeding episode limit")
while not self.new_episode_notification_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')
self.future = self.end_dataset_notification_client.call_async(self.end_dataset_notification_req)
self.future.result()
self.timer.destroy()
sys.exit()

def check_last_step_update_recorder(self, step):
if step["is_last"]:
self.logger.info(f"End of the current episode")
self.episode_counter += 1
# restart_recorder(self.episode_counter)
restart_recorder()
if self.episode_counter > self.episode_limit:
self.logger.info(f"Exceeding episode limit. Current: {self.episode_counter}, limit {self.episode_limit}")
self.end_of_dataset_sequence()
while not self.new_episode_notification_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')
self.future = self.new_episode_notification_client.call_async(self.new_episode_notification_req)
self.future.result()


def main(args=None):
# tf.experimental.numpy.experimental_enable_numpy_behavior()
Expand All @@ -183,4 +213,4 @@ def main(args=None):


if __name__ == "__main__":
main()
main()
6 changes: 0 additions & 6 deletions fogros2-rt-x/fogros2_rt_x/replayer_common.py

This file was deleted.

12 changes: 8 additions & 4 deletions fogros2-rt-x/launch/replayer.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@

from launch import LaunchDescription
from launch_ros.actions import Node

from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
"""Talker example that launches everything locally."""
ld = LaunchDescription()

ld = LaunchDescription([
DeclareLaunchArgument('dataset_name', default_value='berkeley_fanuc_manipulation')
])

replayer_node = Node(
package="fogros2_rt_x",
executable="replayer",
output="screen",
parameters = [
{"dataset_name": "berkeley_fanuc_manipulation"},
{"dataset_name": LaunchConfiguration('dataset_name')},
]
)

Expand All @@ -55,7 +59,7 @@ def generate_launch_description():
executable="recorder",
output="screen",
parameters = [
{"dataset_name": "berkeley_fanuc_manipulation"},
{"dataset_name": LaunchConfiguration('dataset_name')},
]
)
ld.add_action(recorder_node)
Expand Down