Skip to content

Commit

Permalink
ws broadcast client in dispatcher node (#212)
Browse files Browse the repository at this point in the history
* impl ws broadcast client in dispatcher node

Signed-off-by: youliang <tan_you_liang@hotmail.com>

* nit

Signed-off-by: youliang <tan_you_liang@hotmail.com>

* create rmf_websocket pkg

Signed-off-by: youliang <tan_you_liang@hotmail.com>

* make rmf_fleet_adapter depends on rmf_websocket, and nit

Signed-off-by: youliang <tan_you_liang@hotmail.com>

* fix cmakelist

Signed-off-by: youliang <tan_you_liang@hotmail.com>

* cleanups

Signed-off-by: youliang <tan_you_liang@hotmail.com>

* create broadcast server class, and further cleanups

Signed-off-by: youliang <tan_you_liang@hotmail.com>

* Use find instead of at

Signed-off-by: Michael X. Grey <grey@openrobotics.org>

Co-authored-by: Michael X. Grey <grey@openrobotics.org>
  • Loading branch information
youliangtan and mxgrey committed Jul 14, 2022
1 parent e9a0516 commit 3be4edf
Show file tree
Hide file tree
Showing 23 changed files with 808 additions and 352 deletions.
11 changes: 6 additions & 5 deletions rmf_fleet_adapter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ set(dep_pkgs
rmf_task_sequence
std_msgs
rmf_api_msgs
websocketpp
rmf_websocket
nlohmann_json
nlohmann_json_schema_validator_vendor
)
Expand Down Expand Up @@ -83,13 +83,13 @@ target_link_libraries(rmf_fleet_adapter
${rmf_task_msgs_LIBRARIES}
PRIVATE
rmf_rxcpp
rmf_websocket::rmf_websocket
nlohmann_json::nlohmann_json
rmf_api_msgs::rmf_api_msgs
${rmf_door_msgs_LIBRARIES}
${rmf_lift_msgs_LIBRARIES}
${rmf_dispenser_msgs_LIBRARIES}
${rmf_ingestor_msgs_LIBRARIES}
${websocketpp_LIBRARIES}
nlohmann_json_schema_validator
)

Expand All @@ -109,7 +109,8 @@ target_include_directories(rmf_fleet_adapter
${rmf_dispenser_msgs_INCLUDE_DIRS}
${rmf_ingestor_msgs_INCLUDE_DIRS}
${rmf_api_msgs_INCLUDE_DIRS}
${WEBSOCKETPP_INCLUDE_DIR}
${rmf_websocket_INCLUDE_DIR}
${rmf_traffic_ros2_INCLUDE_DIRS}
${nlohmann_json_schema_validator_INCLUDE_DIRS}
)

Expand Down Expand Up @@ -145,7 +146,7 @@ if (BUILD_TESTING)
${rmf_ingestor_msgs_INCLUDE_DIRS}
${rmf_api_msgs_INCLUDE_DIRS}
${std_msgs_INCLUDE_DIRS}
${WEBSOCKETPP_INCLUDE_DIR}
${rmf_websocket_INCLUDE_DIR}
${nlohmann_json_schema_validator_INCLUDE_DIRS}
)
target_link_libraries(test_rmf_fleet_adapter
Expand All @@ -161,7 +162,7 @@ if (BUILD_TESTING)
rmf_utils::rmf_utils
rmf_api_msgs::rmf_api_msgs
${std_msgs_LIBRARIES}
${websocketpp_LIBRARIES}
${rmf_websocket_INCLUDE_DIR}
nlohmann_json_schema_validator
)

Expand Down
2 changes: 1 addition & 1 deletion rmf_fleet_adapter/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<depend condition="$RMF_ENABLE_FAILOVER == 1">stubborn_buddies</depend>
<depend condition="$RMF_ENABLE_FAILOVER == 1">stubborn_buddies_msgs</depend>

<depend>libwebsocketpp-dev</depend>
<depend>rmf_websocket</depend>
<depend>nlohmann-json-dev</depend>
<depend>nlohmann_json_schema_validator_vendor</depend>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"type": "array",
"items": { "$ref": "place.json" }
}
}
},
"required": ["place"]
}
]
}
219 changes: 0 additions & 219 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/BroadcastClient.cpp

This file was deleted.

7 changes: 4 additions & 3 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ namespace rmf_fleet_adapter {
//==============================================================================
TaskManagerPtr TaskManager::make(
agv::RobotContextPtr context,
std::optional<std::weak_ptr<BroadcastClient>> broadcast_client,
std::optional<std::weak_ptr<rmf_websocket::BroadcastClient>> broadcast_client,
std::weak_ptr<agv::FleetUpdateHandle> fleet_handle)
{
auto mgr = TaskManagerPtr(
Expand Down Expand Up @@ -254,7 +254,7 @@ TaskManagerPtr TaskManager::make(
//==============================================================================
TaskManager::TaskManager(
agv::RobotContextPtr context,
std::optional<std::weak_ptr<BroadcastClient>> broadcast_client,
std::optional<std::weak_ptr<rmf_websocket::BroadcastClient>> broadcast_client,
std::weak_ptr<agv::FleetUpdateHandle> fleet_handle)
: _context(std::move(context)),
_broadcast_client(std::move(broadcast_client)),
Expand Down Expand Up @@ -861,7 +861,8 @@ agv::ConstRobotContextPtr TaskManager::context() const
}

//==============================================================================
std::optional<std::weak_ptr<BroadcastClient>> TaskManager::broadcast_client()
std::optional<std::weak_ptr<rmf_websocket::BroadcastClient>> TaskManager::
broadcast_client()
const
{
return _broadcast_client;
Expand Down
11 changes: 6 additions & 5 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

#include "LegacyTask.hpp"
#include "agv/RobotContext.hpp"
#include "BroadcastClient.hpp"
#include <rmf_websocket/BroadcastClient.hpp>

#include <rmf_traffic/agv/Planner.hpp>

Expand Down Expand Up @@ -52,7 +52,7 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>

static std::shared_ptr<TaskManager> make(
agv::RobotContextPtr context,
std::optional<std::weak_ptr<BroadcastClient>> broadcast_client,
std::optional<std::weak_ptr<rmf_websocket::BroadcastClient>> broadcast_client,
std::weak_ptr<agv::FleetUpdateHandle> fleet_handle);

using Start = rmf_traffic::agv::Plan::Start;
Expand Down Expand Up @@ -106,7 +106,8 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>

agv::ConstRobotContextPtr context() const;

std::optional<std::weak_ptr<BroadcastClient>> broadcast_client() const;
std::optional<std::weak_ptr<rmf_websocket::BroadcastClient>> broadcast_client()
const;

/// Set the queue for this task manager with assignments generated from the
/// task planner
Expand Down Expand Up @@ -208,7 +209,7 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>

TaskManager(
agv::RobotContextPtr context,
std::optional<std::weak_ptr<BroadcastClient>> broadcast_client,
std::optional<std::weak_ptr<rmf_websocket::BroadcastClient>> broadcast_client,
std::weak_ptr<agv::FleetUpdateHandle>);

class ActiveTask
Expand Down Expand Up @@ -301,7 +302,7 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>
friend class ActiveTask;

agv::RobotContextPtr _context;
std::optional<std::weak_ptr<BroadcastClient>> _broadcast_client;
std::optional<std::weak_ptr<rmf_websocket::BroadcastClient>> _broadcast_client;
std::weak_ptr<agv::FleetUpdateHandle> _fleet_handle;
rmf_task::ConstActivatorPtr _task_activator;
ActiveTask _active_task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,10 @@ void FleetUpdateHandle::Implementation::update_fleet_state() const
make_validator(rmf_api_msgs::schemas::fleet_state_update);

validator.validate(fleet_state_update_msg);

std::unique_lock<std::mutex> lock(*update_callback_mutex);
if (update_callback)
update_callback(fleet_state_update_msg);
broadcast_client->publish(fleet_state_update_msg);
}
catch (const std::exception& e)
Expand Down Expand Up @@ -928,6 +932,10 @@ void FleetUpdateHandle::Implementation::update_fleet_logs() const
make_validator(rmf_api_msgs::schemas::fleet_log_update);

validator.validate(fleet_log_update_msg);

std::unique_lock<std::mutex> lock(*update_callback_mutex);
if (update_callback)
update_callback(fleet_log_update_msg);
broadcast_client->publish(fleet_log_update_msg);
}
catch (const std::exception& e)
Expand Down Expand Up @@ -1375,7 +1383,7 @@ void FleetUpdateHandle::add_robot(
return;
}

std::optional<std::weak_ptr<BroadcastClient>>
std::optional<std::weak_ptr<rmf_websocket::BroadcastClient>>
broadcast_client = std::nullopt;

if (fleet->_pimpl->broadcast_client)
Expand Down
Loading

0 comments on commit 3be4edf

Please sign in to comment.