diff --git a/rmf_fleet_adapter/CMakeLists.txt b/rmf_fleet_adapter/CMakeLists.txt index ce99ddd1b..1deac4172 100644 --- a/rmf_fleet_adapter/CMakeLists.txt +++ b/rmf_fleet_adapter/CMakeLists.txt @@ -38,7 +38,7 @@ set(dep_pkgs rmf_task_sequence std_msgs rmf_api_msgs - websocketpp + rmf_websocket nlohmann_json nlohmann_json_schema_validator_vendor ) @@ -83,13 +83,13 @@ target_link_libraries(rmf_fleet_adapter ${rmf_task_msgs_LIBRARIES} PRIVATE rmf_rxcpp + rmf_websocket::rmf_websocket nlohmann_json::nlohmann_json rmf_api_msgs::rmf_api_msgs ${rmf_door_msgs_LIBRARIES} ${rmf_lift_msgs_LIBRARIES} ${rmf_dispenser_msgs_LIBRARIES} ${rmf_ingestor_msgs_LIBRARIES} - ${websocketpp_LIBRARIES} nlohmann_json_schema_validator ) @@ -109,7 +109,8 @@ target_include_directories(rmf_fleet_adapter ${rmf_dispenser_msgs_INCLUDE_DIRS} ${rmf_ingestor_msgs_INCLUDE_DIRS} ${rmf_api_msgs_INCLUDE_DIRS} - ${WEBSOCKETPP_INCLUDE_DIR} + ${rmf_websocket_INCLUDE_DIR} + ${rmf_traffic_ros2_INCLUDE_DIRS} ${nlohmann_json_schema_validator_INCLUDE_DIRS} ) @@ -145,7 +146,7 @@ if (BUILD_TESTING) ${rmf_ingestor_msgs_INCLUDE_DIRS} ${rmf_api_msgs_INCLUDE_DIRS} ${std_msgs_INCLUDE_DIRS} - ${WEBSOCKETPP_INCLUDE_DIR} + ${rmf_websocket_INCLUDE_DIR} ${nlohmann_json_schema_validator_INCLUDE_DIRS} ) target_link_libraries(test_rmf_fleet_adapter @@ -161,7 +162,7 @@ if (BUILD_TESTING) rmf_utils::rmf_utils rmf_api_msgs::rmf_api_msgs ${std_msgs_LIBRARIES} - ${websocketpp_LIBRARIES} + ${rmf_websocket_INCLUDE_DIR} nlohmann_json_schema_validator ) diff --git a/rmf_fleet_adapter/package.xml b/rmf_fleet_adapter/package.xml index 02782d39a..8e046995c 100644 --- a/rmf_fleet_adapter/package.xml +++ b/rmf_fleet_adapter/package.xml @@ -31,7 +31,7 @@ stubborn_buddies stubborn_buddies_msgs - libwebsocketpp-dev + rmf_websocket nlohmann-json-dev nlohmann_json_schema_validator_vendor diff --git a/rmf_fleet_adapter/schemas/event_description__go_to_place.json b/rmf_fleet_adapter/schemas/event_description__go_to_place.json index 9e0d5a1f8..8d932d01f 100644 --- a/rmf_fleet_adapter/schemas/event_description__go_to_place.json +++ b/rmf_fleet_adapter/schemas/event_description__go_to_place.json @@ -16,7 +16,8 @@ "type": "array", "items": { "$ref": "place.json" } } - } + }, + "required": ["place"] } ] } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/BroadcastClient.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/BroadcastClient.cpp deleted file mode 100644 index b55d395d4..000000000 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/BroadcastClient.cpp +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright (C) 2021 Open Source Robotics Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * -*/ - -#include "BroadcastClient.hpp" - -#include "agv/internal_FleetUpdateHandle.hpp" - -namespace rmf_fleet_adapter { -//============================================================================== -std::shared_ptr BroadcastClient::make( - const std::string& uri, - std::weak_ptr fleet_handle) -{ - std::shared_ptr client(new BroadcastClient()); - client->_uri = std::move(uri); - client->_fleet_handle = fleet_handle; - client->_shutdown = false; - client->_connected = false; - - // Initialize the Asio transport policy - client->_client.clear_access_channels(websocketpp::log::alevel::all); - client->_client.clear_error_channels(websocketpp::log::elevel::all); - client->_client.init_asio(); - client->_client.start_perpetual(); - client->_client_thread = std::thread( - [c = client]() - { - c->_client.run(); - }); - - client->_client.set_open_handler( - [c = client](websocketpp::connection_hdl) - { - c->_connected = true; - const auto fleet = c->_fleet_handle.lock(); - if (!fleet) - return; - const auto& impl = agv::FleetUpdateHandle::Implementation::get(*fleet); - for (const auto& [conext, mgr] : impl.task_managers) - { - // Publish all task logs to the server - c->publish(mgr->task_log_updates()); - } - RCLCPP_INFO( - impl.node->get_logger(), - "BroadcastClient successfully connected to uri: [%s]", - c->_uri.c_str()); - }); - - client->_client.set_close_handler( - [c = client](websocketpp::connection_hdl) - { - c->_connected = false; - }); - - client->_client.set_fail_handler( - [c = client](websocketpp::connection_hdl) - { - c->_connected = false; - }); - - client->_processing_thread = std::thread( - [c = client]() - { - while (!c->_shutdown) - { - const auto fleet = c->_fleet_handle.lock(); - if (!fleet) - { - continue; - } - const auto& impl = agv::FleetUpdateHandle::Implementation::get(*fleet); - - // Try to connect to the server if we are not connected yet - if (!c->_connected) - { - websocketpp::lib::error_code ec; - WebsocketClient::connection_ptr con = c->_client.get_connection( - c->_uri, ec); - - if (con) - { - c->_hdl = con->get_handle(); - c->_client.connect(con); - // TOD(YV): Without sending a test payload, ec seems to be 0 even - // when the client has not connected. Avoid sending this message. - c->_client.send(c->_hdl, "Hello", websocketpp::frame::opcode::text, - ec); - } - - if (!con || ec) - { - RCLCPP_WARN( - impl.node->get_logger(), - "BroadcastClient unable to connect to [%s]. Please make sure " - "server is running. Error msg: %s", - c->_uri.c_str(), - ec.message().c_str()); - c->_connected = false; - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - continue; - } - - RCLCPP_INFO( - impl.node->get_logger(), - "BroadcastClient successfully connected to [%s]", - c->_uri.c_str()); - c->_connected = true; - } - - std::unique_lock lock(c->_wait_mutex); - c->_cv.wait(lock, - [c]() - { - return !c->_queue.empty(); - }); - - while (!c->_queue.empty()) - { - std::lock_guard lock(c->_queue_mutex); - websocketpp::lib::error_code ec; - const std::string& msg = c->_queue.front().dump(); - c->_client.send(c->_hdl, msg, websocketpp::frame::opcode::text, ec); - if (ec) - { - RCLCPP_ERROR( - impl.node->get_logger(), - "BroadcastClient unable to publish message: %s", - ec.message().c_str()); - // TODO(YV): Check if we should re-connect to server - break; - } - c->_queue.pop(); - } - } - }); - - return client; - -} - -//============================================================================== -void BroadcastClient::publish(const nlohmann::json& msg) -{ - { - const auto fleet = _fleet_handle.lock(); - if (!fleet) - return; - - auto& impl = agv::FleetUpdateHandle::Implementation::get(*fleet); - std::unique_lock lock(*impl.update_callback_mutex); - if (impl.update_callback) - impl.update_callback(msg); - } - - std::lock_guard lock(_queue_mutex); - _queue.push(msg); - _cv.notify_all(); -} - -//============================================================================== -void BroadcastClient::publish(const std::vector& msgs) -{ - { - const auto fleet = _fleet_handle.lock(); - if (!fleet) - return; - - auto& impl = agv::FleetUpdateHandle::Implementation::get(*fleet); - std::unique_lock lock(*impl.update_callback_mutex); - if (impl.update_callback) - { - for (const auto& msg : msgs) - impl.update_callback(msg); - } - } - - std::lock_guard lock(_queue_mutex); - for (const auto& msg : msgs) - _queue.push(msg); - _cv.notify_all(); -} - -//============================================================================== -BroadcastClient::BroadcastClient() -{ - // Do nothing -} - -//============================================================================== -BroadcastClient::~BroadcastClient() -{ - _shutdown = true; - if (_processing_thread.joinable()) - { - _processing_thread.join(); - } - if (_client_thread.joinable()) - { - _client_thread.join(); - } - _client.stop_perpetual(); -} - -} // namespace rmf_fleet_adapter diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 6f45d659e..d62a5207d 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -70,7 +70,7 @@ namespace rmf_fleet_adapter { //============================================================================== TaskManagerPtr TaskManager::make( agv::RobotContextPtr context, - std::optional> broadcast_client, + std::optional> broadcast_client, std::weak_ptr fleet_handle) { auto mgr = TaskManagerPtr( @@ -254,7 +254,7 @@ TaskManagerPtr TaskManager::make( //============================================================================== TaskManager::TaskManager( agv::RobotContextPtr context, - std::optional> broadcast_client, + std::optional> broadcast_client, std::weak_ptr fleet_handle) : _context(std::move(context)), _broadcast_client(std::move(broadcast_client)), @@ -861,7 +861,8 @@ agv::ConstRobotContextPtr TaskManager::context() const } //============================================================================== -std::optional> TaskManager::broadcast_client() +std::optional> TaskManager:: +broadcast_client() const { return _broadcast_client; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 179592277..36e9069cf 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -20,7 +20,7 @@ #include "LegacyTask.hpp" #include "agv/RobotContext.hpp" -#include "BroadcastClient.hpp" +#include #include @@ -52,7 +52,7 @@ class TaskManager : public std::enable_shared_from_this static std::shared_ptr make( agv::RobotContextPtr context, - std::optional> broadcast_client, + std::optional> broadcast_client, std::weak_ptr fleet_handle); using Start = rmf_traffic::agv::Plan::Start; @@ -106,7 +106,8 @@ class TaskManager : public std::enable_shared_from_this agv::ConstRobotContextPtr context() const; - std::optional> broadcast_client() const; + std::optional> broadcast_client() + const; /// Set the queue for this task manager with assignments generated from the /// task planner @@ -208,7 +209,7 @@ class TaskManager : public std::enable_shared_from_this TaskManager( agv::RobotContextPtr context, - std::optional> broadcast_client, + std::optional> broadcast_client, std::weak_ptr); class ActiveTask @@ -301,7 +302,7 @@ class TaskManager : public std::enable_shared_from_this friend class ActiveTask; agv::RobotContextPtr _context; - std::optional> _broadcast_client; + std::optional> _broadcast_client; std::weak_ptr _fleet_handle; rmf_task::ConstActivatorPtr _task_activator; ActiveTask _active_task; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 58132d401..ad185d65b 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -879,6 +879,10 @@ void FleetUpdateHandle::Implementation::update_fleet_state() const make_validator(rmf_api_msgs::schemas::fleet_state_update); validator.validate(fleet_state_update_msg); + + std::unique_lock lock(*update_callback_mutex); + if (update_callback) + update_callback(fleet_state_update_msg); broadcast_client->publish(fleet_state_update_msg); } catch (const std::exception& e) @@ -928,6 +932,10 @@ void FleetUpdateHandle::Implementation::update_fleet_logs() const make_validator(rmf_api_msgs::schemas::fleet_log_update); validator.validate(fleet_log_update_msg); + + std::unique_lock lock(*update_callback_mutex); + if (update_callback) + update_callback(fleet_log_update_msg); broadcast_client->publish(fleet_log_update_msg); } catch (const std::exception& e) @@ -1375,7 +1383,7 @@ void FleetUpdateHandle::add_robot( return; } - std::optional> + std::optional> broadcast_client = std::nullopt; if (fleet->_pimpl->broadcast_client) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index acb7d3d0e..f48102660 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -43,8 +43,8 @@ #include "Node.hpp" #include "RobotContext.hpp" #include "../TaskManager.hpp" -#include "../BroadcastClient.hpp" #include "../DeserializeJSON.hpp" +#include #include #include @@ -263,7 +263,7 @@ class FleetUpdateHandle::Implementation std::unordered_map> task_managers = {}; - std::shared_ptr broadcast_client = nullptr; + std::shared_ptr broadcast_client = nullptr; // Map uri to schema for validator loader function std::unordered_map schema_dictionary = {}; @@ -407,9 +407,19 @@ class FleetUpdateHandle::Implementation // Start the BroadcastClient if (handle->_pimpl->server_uri.has_value()) { - handle->_pimpl->broadcast_client = BroadcastClient::make( + handle->_pimpl->broadcast_client = rmf_websocket::BroadcastClient::make( handle->_pimpl->server_uri.value(), - handle->weak_from_this()); + handle->_pimpl->node, + [handle]() + { + std::vector task_logs; + for (const auto& [conext, mgr] : handle->_pimpl->task_managers) + { + // Publish all task logs to the server + task_logs.push_back(mgr->task_log_updates()); + } + return task_logs; + }); } // Add PerformAction event to deserialization diff --git a/rmf_fleet_adapter/test/tasks/test_Delivery.cpp b/rmf_fleet_adapter/test/tasks/test_Delivery.cpp index ab5bef270..be3b71277 100644 --- a/rmf_fleet_adapter/test/tasks/test_Delivery.cpp +++ b/rmf_fleet_adapter/test/tasks/test_Delivery.cpp @@ -16,7 +16,7 @@ */ #include "../mock/MockRobotCommand.hpp" -#include "../mock/MockWebSocketServer.hpp" +#include #include #include @@ -371,8 +371,8 @@ SCENARIO("Test Delivery") /// of the targeted task id, by listening to the task_state_update /// from the websocket connection /* *INDENT-OFF* */ - using MockServer = rmf_fleet_adapter_test::MockWebSocketServer; - MockServer mock_server( + using WebsocketServer = rmf_websocket::BroadcastServer; + const auto ws_server = WebsocketServer::make( 37878, [ &cb_mutex, delivery_id, &completed_promise, &at_least_one_incomplete, &fulfilled_promise]( @@ -383,7 +383,7 @@ SCENARIO("Test Delivery") assert(data.contains("status")); const auto id = data.at("booking").at("id"); const auto status = data.at("status"); - std::cout << "[MockWebSocketServer] id: [" << id + std::cout << "[WebSocketServer] id: [" << id << "] ::: json state ::: " << status << std::endl; if (id == delivery_id && status == "completed") @@ -397,7 +397,7 @@ SCENARIO("Test Delivery") else at_least_one_incomplete = true; }, - MockServer::ApiMsgType::TaskStateUpdate + WebsocketServer::ApiMsgType::TaskStateUpdate ); /* *INDENT-ON* */ @@ -474,7 +474,7 @@ SCENARIO("Test Delivery") auto flaky_future = flaky_ingestor->success_promise.get_future(); adapter.start(); - mock_server.start(); + ws_server->start(); // Note: wait for task_manager to start, else TM will be suspicously "empty" std::this_thread::sleep_for(1s); @@ -522,6 +522,6 @@ SCENARIO("Test Delivery") std::lock_guard lock(cb_mutex); CHECK(at_least_one_incomplete); - mock_server.stop(); + ws_server->stop(); adapter.stop(); } diff --git a/rmf_fleet_adapter/test/tasks/test_Loop.cpp b/rmf_fleet_adapter/test/tasks/test_Loop.cpp index c6ef42d03..f38ba561c 100644 --- a/rmf_fleet_adapter/test/tasks/test_Loop.cpp +++ b/rmf_fleet_adapter/test/tasks/test_Loop.cpp @@ -16,7 +16,7 @@ */ #include "../mock/MockRobotCommand.hpp" -#include "../mock/MockWebSocketServer.hpp" +#include #include #include @@ -166,8 +166,8 @@ SCENARIO("Test loop requests", "[.flaky]") /// of the targeted task id, by listening to the task_state_update /// from the websocket connection /* *INDENT-OFF* */ - using MockServer = rmf_fleet_adapter_test::MockWebSocketServer; - MockServer mock_server( + using WebsocketServer = rmf_websocket::BroadcastServer; + const auto ws_server = WebsocketServer::make( 27878, [ &loop_0, &task_0_completed_promise, &completed_0_count, &loop_1, &task_1_completed_promise, &completed_1_count, @@ -178,7 +178,7 @@ SCENARIO("Test loop requests", "[.flaky]") assert(data.contains("status")); const auto id = data.at("booking").at("id"); const auto status = data.at("status"); - std::cout << "[MockWebSocketServer] id: [" << id + std::cout << "[WebSocketServer] id: [" << id << "] ::: json state ::: " << status << std::endl; if (status == "completed") @@ -226,7 +226,7 @@ SCENARIO("Test loop requests", "[.flaky]") // } // } }, - MockServer::ApiMsgType::TaskStateUpdate); + WebsocketServer::ApiMsgType::TaskStateUpdate); /* *INDENT-ON* */ const std::size_t n_loops = 3; @@ -301,7 +301,7 @@ SCENARIO("Test loop requests", "[.flaky]") }); adapter.start(); - mock_server.start(); + ws_server->start(); // Note: wait for task_manager to start, else TM will be suspicously "empty" std::this_thread::sleep_for(1s); @@ -324,7 +324,7 @@ SCENARIO("Test loop requests", "[.flaky]") // Dispatch Loop 1 Task places.clear(); places.push_back(north); - places.push_back(east); + places.push_back(south); adapter.dispatch_task(loop_1, request); const auto task_0_completed_status = task_0_completed_future.wait_for(20s); @@ -414,5 +414,5 @@ SCENARIO("Test loop requests", "[.flaky]") // } adapter.stop(); - mock_server.stop(); + ws_server->stop(); } diff --git a/rmf_task_ros2/CMakeLists.txt b/rmf_task_ros2/CMakeLists.txt index 0cd778244..661ade0fe 100644 --- a/rmf_task_ros2/CMakeLists.txt +++ b/rmf_task_ros2/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.5) project(rmf_task_ros2) if(NOT CMAKE_CXX_STANDARD) - set(CMAKE_CXX_STANDARD 14) + set(CMAKE_CXX_STANDARD 17) endif() if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") # we dont use add_compile_options with pedantic in message packages @@ -18,6 +18,7 @@ find_package(rmf_api_msgs REQUIRED) find_package(rmf_traffic REQUIRED) find_package(rmf_traffic_ros2 REQUIRED) find_package(rmf_task_msgs REQUIRED) +find_package(rmf_websocket REQUIRED) find_package(Eigen3 REQUIRED) find_package(rclcpp REQUIRED) find_package(nlohmann_json REQUIRED) @@ -31,6 +32,7 @@ target_link_libraries(rmf_task_ros2 rmf_api_msgs::rmf_api_msgs rmf_traffic::rmf_traffic rmf_traffic_ros2::rmf_traffic_ros2 + rmf_websocket::rmf_websocket ${rmf_task_msgs_LIBRARIES} ${rclcpp_LIBRARIES} nlohmann_json::nlohmann_json @@ -43,6 +45,7 @@ target_include_directories(rmf_task_ros2 $ ${rmf_traffic_ros2_INCLUDE_DIRS} ${rmf_task_msgs_INCLUDE_DIRS} + ${rmf_websocket_INCLUDE_DIRS} ${rclcpp_INCLUDE_DIRS} ) @@ -73,6 +76,7 @@ if(BUILD_TESTING) target_link_libraries(test_rmf_task_ros2 rmf_task_ros2 rmf_traffic::rmf_traffic + rmf_websocket::rmf_websocket rmf_traffic_ros2::rmf_traffic_ros2 -pthread ) diff --git a/rmf_task_ros2/include/rmf_task_ros2/DispatchState.hpp b/rmf_task_ros2/include/rmf_task_ros2/DispatchState.hpp index cba5336f1..1bf0e9784 100644 --- a/rmf_task_ros2/include/rmf_task_ros2/DispatchState.hpp +++ b/rmf_task_ros2/include/rmf_task_ros2/DispatchState.hpp @@ -78,11 +78,17 @@ struct DispatchState /// Any errors that have occurred for this dispatching std::vector errors; + /// task request form + nlohmann::json request; + DispatchState(std::string task_id, rmf_traffic::Time submission_time); }; using DispatchStatePtr = std::shared_ptr; +//============================================================================== +std::string status_to_string(DispatchState::Status status); + //============================================================================== rmf_task_msgs::msg::Assignment convert( const std::optional& assignment); diff --git a/rmf_task_ros2/package.xml b/rmf_task_ros2/package.xml index e90127ade..f3a7afaad 100644 --- a/rmf_task_ros2/package.xml +++ b/rmf_task_ros2/package.xml @@ -16,6 +16,7 @@ rmf_traffic_ros2 rmf_task_msgs rclcpp + rmf_websocket nlohmann-json-dev nlohmann_json_schema_validator_vendor diff --git a/rmf_task_ros2/src/rmf_task_ros2/DispatchState.cpp b/rmf_task_ros2/src/rmf_task_ros2/DispatchState.cpp index bbec191ec..66e355488 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/DispatchState.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/DispatchState.cpp @@ -32,6 +32,27 @@ DispatchState::DispatchState( // Do nothing } +//============================================================================== +std::string status_to_string(DispatchState::Status status) +{ + using Status = DispatchState::Status; + switch (status) + { + case Status::Queued: + return "queued"; + case Status::Selected: + return "selected"; + case Status::Dispatched: + return "dispatched"; + case Status::FailedToAssign: + return "failed_to_assign"; + case Status::CanceledInFlight: + return "canceled_in_flight"; + default: + return "failed_to_assign"; + } +} + //============================================================================= rmf_task_msgs::msg::Assignment convert( const std::optional& assignment) diff --git a/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp b/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp index 2e5fcb099..26cff59e8 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp @@ -18,6 +18,8 @@ #include #include +#include + #include #include @@ -97,6 +99,10 @@ class Dispatcher::Implementation std::shared_ptr node; std::shared_ptr auctioneer; + std::shared_ptr broadcast_client; + + const nlohmann::json _task_state_update_json = + {{"type", "task_state_update"}, {"data", {}}}; using SubmitTaskSrv = rmf_task_msgs::srv::SubmitTask; using CancelTaskSrv = rmf_task_msgs::srv::CancelTask; @@ -204,6 +210,18 @@ class Dispatcher::Implementation " Declared publish_active_tasks_period as: %d secs", publish_active_tasks_period); + + std::optional server_uri = std::nullopt; + const std::string uri = + node->declare_parameter("server_uri", std::string()); + if (!uri.empty()) + { + RCLCPP_INFO( + node->get_logger(), + "API server URI: [%s]", uri.c_str()); + server_uri = uri; + } + const auto qos = rclcpp::ServicesQoS().reliable(); dispatch_states_pub = node->create_publisher( rmf_task_ros2::DispatchStatesTopicName, qos); @@ -245,6 +263,10 @@ class Dispatcher::Implementation this->handle_dispatch_ack(*msg); }); + if (server_uri) + broadcast_client = rmf_websocket::BroadcastClient::make( + *server_uri, node); + auctioneer = bidding::Auctioneer::make( node, [this]( @@ -530,35 +552,14 @@ class Dispatcher::Implementation nlohmann::json push_bid_notice(bidding::BidNoticeMsg bid_notice) { - nlohmann::json state; - auto& booking = state["booking"]; - booking["id"] = bid_notice.task_id; - const auto request = nlohmann::json::parse(bid_notice.request); - static const std::vector copy_fields = { - "unix_millis_earliest_start_time", - "priority", - "labels" - }; - - for (const auto& field : copy_fields) - { - const auto f_it = request.find(field); - if (f_it != request.end()) - booking[field] = f_it.value(); - } - - state["category"] = request["category"]; - state["detail"] = request["description"]; - - auto& dispatch = state["dispatch"]; - dispatch["status"] = "queued"; - - // TODO(MXG): Publish this initial task state message to the websocket! - auto new_dispatch_state = std::make_shared( bid_notice.task_id, std::chrono::steady_clock::now()); + new_dispatch_state->request = request; + + // Publish this initial task state message to the websocket + auto state = publish_task_state_ws(new_dispatch_state, "queued"); active_dispatch_states[bid_notice.task_id] = new_dispatch_state; @@ -740,6 +741,9 @@ class Dispatcher::Implementation ++error_count; } + /// Publish failed bid + publish_task_state_ws(dispatch_state, "failed"); + auctioneer->ready_for_next_bid(); return; } @@ -770,6 +774,55 @@ class Dispatcher::Implementation dispatch_command_pub->publish(award_command); } + //============================================================================== + nlohmann::json publish_task_state_ws( + const std::shared_ptr state, + const std::string& status) + { + nlohmann::json task_state; + auto& booking = task_state["booking"]; + booking["id"] = state->task_id; + + static const std::vector copy_fields = { + "unix_millis_earliest_start_time", + "priority", + "labels" + }; + + for (const auto& field : copy_fields) + { + const auto f_it = state->request.find(field); + if (f_it != state->request.end()) + booking[field] = f_it.value(); + } + + task_state["category"] = state->request["category"]; + task_state["detail"] = state->request["description"]; + task_state["status"] = status; + + /// NOTE: This should be null, but the reason of populating this for + /// now is to provide an estimated start_time to the dashboard, so + /// sort by start time will still work + task_state["unix_millis_start_time"] = + booking["unix_millis_earliest_start_time"]; + + /// TODO: populate assignment from state + + nlohmann::json dispatch_json; + dispatch_json["status"] = status_to_string(state->status); + dispatch_json["errors"] = state->errors; + task_state["dispatch"] = dispatch_json; + + auto task_state_update = _task_state_update_json; + task_state_update["data"] = task_state; + + /// TODO: (YL) json validator for taskstateupdate + + if (broadcast_client) + broadcast_client->publish(task_state_update); + return task_state; + } + void move_to_finished(const std::string& task_id) { const auto active_it = active_dispatch_states.find(task_id); diff --git a/rmf_websocket/CMakeLists.txt b/rmf_websocket/CMakeLists.txt new file mode 100644 index 000000000..21a1227ec --- /dev/null +++ b/rmf_websocket/CMakeLists.txt @@ -0,0 +1,61 @@ +cmake_minimum_required(VERSION 3.5) + +project(rmf_websocket) + +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 17) +endif() +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + # we dont use add_compile_options with pedantic in message packages + # because the Python C extensions dont comply with it + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wpedantic") +endif() + +include(GNUInstallDirs) + +find_package(ament_cmake REQUIRED) +find_package(rmf_utils REQUIRED) +find_package(Eigen3 REQUIRED) +find_package(rclcpp REQUIRED) +find_package(nlohmann_json REQUIRED) +find_package(nlohmann_json_schema_validator_vendor REQUIRED) +find_package(websocketpp REQUIRED) + +file(GLOB_RECURSE core_lib_srcs "src/rmf_websocket/*.cpp") +add_library(rmf_websocket SHARED ${core_lib_srcs}) + +target_link_libraries(rmf_websocket + PUBLIC + ${rclcpp_LIBRARIES} + ${websocketpp_LIBRARIES} + rmf_utils::rmf_utils + nlohmann_json::nlohmann_json + nlohmann_json_schema_validator +) + +target_include_directories(rmf_websocket + PUBLIC + $ + $ + ${rclcpp_INCLUDE_DIRS} + ${WEBSOCKETPP_INCLUDE_DIR} +) + +ament_export_targets(rmf_websocket HAS_LIBRARY_TARGET) +ament_export_dependencies(rmf_traffic rclcpp nlohmann_json) + +#=============================================================================== +install( + DIRECTORY include/ + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} +) + +install( + TARGETS rmf_websocket + EXPORT rmf_websocket + RUNTIME DESTINATION lib/rmf_websocket + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib +) + +ament_package() diff --git a/rmf_websocket/QUALITY_DECLARATION.md b/rmf_websocket/QUALITY_DECLARATION.md new file mode 100644 index 000000000..1e57e1e57 --- /dev/null +++ b/rmf_websocket/QUALITY_DECLARATION.md @@ -0,0 +1,153 @@ +This document is a declaration of software quality for the `rmf_websocket` package, based on the guidelines in [REP-2004](https://www.ros.org/reps/rep-2004.html). + +# `rmf_websocket` Quality Declaration + +The package `rmf_websocket` claims to be in the **Quality Level 4** category. + +Below are the rationales, notes, and caveats for this claim, organized by each requirement listed in the [Package Requirements for Quality Level 4 in REP-2004](https://www.ros.org/reps/rep-2004.html). + +## Version Policy [1] + +### Version Scheme [1.i] + +`rmf_websocket` uses `semver` according to the recommendation for ROS Core packages in the [ROS 2 Developer Guide](https://index.ros.org/doc/ros2/Contributing/Developer-Guide/#versioning). + +### Version Stability [1.ii] + +`rmf_websocket` is at a stable version, i.e. `>= 1.0.0`. +The current version can be found in its [package.xml](package.xml), and its change history can be found in its [CHANGELOG](CHANGELOG.rst). + +### Public API Declaration [1.iii] + +All symbols in the installed headers are considered part of the public API. + +All installed headers are in the `include` directory of the package. +Headers in any other folders are not installed and are considered private. + +All launch files in the installed `launch` directory are considered part of the public API. + +### API Stability Policy [1.iv] + +`rmf_websocket` will not break public API within a major version number. + +### ABI Stability Policy [1.v] + +`rmf_websocket` will not break public ABI within a major version number. + +### API and ABI Stability Within a Released ROS Distribution [1.vi] + +`rmf_websocket` will not break public API or ABI within a released ROS distribution, i.e. no major releases into the same ROS distribution once that ROS distribution is released. + +## Change Control Process [2] + +`rmf_websocket` follows the recommended guidelines for ROS Core packages in the [ROS 2 Developer Guide](https://index.ros.org/doc/ros2/Contributing/Developer-Guide/#package-requirements). + +### Change Requests [2.i] + +`rmf_websocket` requires that all changes occur through a pull request. + +### Contributor Origin [2.ii] + +`rmf_websocket` uses DCO as its confirmation of contributor origin policy. +More information can be found in [CONTRIBUTING](../CONTRIBUTING.md). + +### Peer Review Policy [2.iii] + +All pull requests must have at least 1 peer review. + +### Continuous Integration [2.iv] + +All pull requests must pass CI on all platforms supported by RMF. + +The most recent CI results can be seen on [the workflow page](https://github.com/open-rmf/rmf_ros2/actions). + +### Documentation Policy [2.v] + +All pull requests must resolve related documentation changes before merging. + +## Documentation [3] + +### Feature Documentation [3.i] + +`rmf_websocket` does not provide documentation. + +### Public API Documentation [3.ii] + +`rmf_websocket` documents its public API. +The documentation is not hosted. + +### License [3.iii] + +The license for `rmf_websocket` is Apache 2.0, the type is declared in the [package.xml](package.xml) manifest file, and a full copy of the license is in the repository level [LICENSE](../LICENSE) file. + +### Copyright Statement [3.iv] + +The copyright holders each provide a statement of copyright in each source code file in `rmf_websocket`. + +### Quality declaration document [3.v] + +This quality declaration is linked in the [README file](README.md). + +This quality declaration has not been externally peer-reviewed and is not registered on any Level 4 lists. + +## Testing [4] + +### Feature Testing [4.i] + +`rmf_websocket` does not have testing yet +### Public API Testing [4.ii] + +`rmf_websocket` does not have testing yet + +### Coverage [4.iii] + +`rmf_websocket` does not track coverage statistics. + +### Performance [4.iv] + +`rmf_websocket` does not test performance. + +### Linters and Static Analysis [4.v] + +`rmf_websocket` does not use the standard linters and static analysis tools for its CMake code to ensure it follows the [ROS 2 Developer Guide](https://index.ros.org/doc/ros2/Contributing/Developer-Guide/#linters). + +`rmf_websocket` uses a custom `uncrustify` configuration matching its coding style. + +## Dependencies [5] + +### Direct Runtime ROS Dependencies [5.i] + +Below are the required direct runtime ROS dependencies of `rmf_websocket` and their evaluations. + +#### rmf\_utils + +`rmf_utils` is [**Quality Level 4**](https://github.com/open-rmf/rmf_utils/blob/main/rmf_utils/QUALITY_DECLARATION.md). + +#### rclcpp + +`rclcpp` is [**Quality Level 1**](https://github.com/ros2/rclcpp/blob/main/rclcpp/QUALITY_DECLARATION.md). + +### Optional Direct Runtime ROS Dependencies [5.ii] + +`rmf_websocket` has no optional runtime ROS dependencies. + +### Direct Runtime non-ROS Dependency [5.iii] + +`rmf_websocket` has the following direct runtime non-ROS dependencies. + +#### websocketpp + +`websocketpp` is taken to be **Quality Level 3** due to its wide-spread use, history, use of CI, and use of testing. + +## Platform Support [6] + +### Target platforms [6.i] + +`rmf_websocket` does not support all of the tier 1 platforms as described in [REP-2000](https://www.ros.org/reps/rep-2000.html#support-tiers). +`rmf_websocket` supports ROS Foxy. + +## Security [7] + +### Vulnerability Disclosure Policy [7.i] + +This package conforms to the Vulnerability Disclosure Policy in [REP-2006](https://www.ros.org/reps/rep-2006.html). diff --git a/rmf_websocket/README.md b/rmf_websocket/README.md new file mode 100644 index 000000000..529ffd735 --- /dev/null +++ b/rmf_websocket/README.md @@ -0,0 +1,7 @@ +# rmf\_websocket package + +This package provides a websocker wrapper client library to interact with websocket server. + +## Quality Declaration + +This package claims to be in the **Quality Level 4** category. See the [Quality Declaration](QUALITY_DECLARATION.md) for more details. diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/BroadcastClient.hpp b/rmf_websocket/include/rmf_websocket/BroadcastClient.hpp similarity index 55% rename from rmf_fleet_adapter/src/rmf_fleet_adapter/BroadcastClient.hpp rename to rmf_websocket/include/rmf_websocket/BroadcastClient.hpp index 3ef1beecd..676207239 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/BroadcastClient.hpp +++ b/rmf_websocket/include/rmf_websocket/BroadcastClient.hpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021 Open Source Robotics Foundation + * Copyright (C) 2022 Open Source Robotics Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,15 +15,13 @@ * */ -#ifndef SRC__RMF_FLEET_ADAPTER__BROADCASTCLIENT_HPP -#define SRC__RMF_FLEET_ADAPTER__BROADCASTCLIENT_HPP - -#include -#include +#ifndef RMF_WEBSOCKET__BROADCAST_CLIENT_HPP +#define RMF_WEBSOCKET__BROADCAST_CLIENT_HPP #include -#include +#include +#include #include #include @@ -32,7 +30,7 @@ #include #include -namespace rmf_fleet_adapter { +namespace rmf_websocket { //============================================================================== // A wrapper around a websocket client for broadcasting states and logs for // fleets, robots and tasks. A queue of json msgs is maintained and published @@ -40,17 +38,20 @@ namespace rmf_fleet_adapter { class BroadcastClient : public std::enable_shared_from_this { public: - using WebsocketClient = - websocketpp::client; - using WebsocketMessagePtr = WebsocketClient::message_ptr; - using ConnectionHDL = websocketpp::connection_hdl; - using Connections = std::set>; + using ProvideJsonUpdates = std::function()>; /// \param[in] uri /// "ws://localhost:9000" + /// + /// \param[in] node + /// + /// \param[in] on_open_connection_fn + /// Provided function callback will be called whenever the ws client + /// is connected to the server static std::shared_ptr make( const std::string& uri, - std::weak_ptr fleet_handle); + const std::shared_ptr& node, + ProvideJsonUpdates on_open_connection_fn = nullptr); // Publish a single message void publish(const nlohmann::json& msg); @@ -58,24 +59,13 @@ class BroadcastClient : public std::enable_shared_from_this // Publish a vector of messages void publish(const std::vector& msgs); - ~BroadcastClient(); + class Implementation; private: BroadcastClient(); - std::string _uri; - std::weak_ptr _fleet_handle; - WebsocketClient _client; - websocketpp::connection_hdl _hdl; - std::mutex _wait_mutex; - std::mutex _queue_mutex; - std::condition_variable _cv; - std::queue _queue; - std::thread _processing_thread; - std::thread _client_thread; - std::atomic_bool _connected; - std::atomic_bool _shutdown; + rmf_utils::unique_impl_ptr _pimpl; }; -} // namespace rmf_fleet_adapter +} // namespace rmf_websocket -#endif // SRC__RMF_FLEET_ADAPTER__BROADCASTCLIENT_HPP +#endif // RMF_WEBSOCKET__BROADCAST_CLIENT_HPP diff --git a/rmf_websocket/include/rmf_websocket/BroadcastServer.hpp b/rmf_websocket/include/rmf_websocket/BroadcastServer.hpp new file mode 100644 index 000000000..836f27401 --- /dev/null +++ b/rmf_websocket/include/rmf_websocket/BroadcastServer.hpp @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef RMF_WEBSOCKET__BROADCAST_SERVER_HPP +#define RMF_WEBSOCKET__BROADCAST_SERVER_HPP + +#include +#include + +#include + +#include +#include + +using Server = websocketpp::server; + +namespace rmf_websocket { + +//============================================================================== +/// This BroadcastServer is a wrapper of a websocket server. User need to +/// specify the api_msg_type, and provide a callback function. The provided +/// callback will be called when the specified msg_type is received. Note that +/// this will spawn a seperate thread to host the web socket server +class BroadcastServer +{ +public: + enum class ApiMsgType + { + TaskStateUpdate, + TaskLogUpdate, + FleetStateUpdate, + FleetLogUpdate, + }; + + using ApiMessageCallback = std::function; + + /// Add a callback to convert from a Description into an active Task. + /// + /// \param[in] port + /// server url port number + /// + /// \param[in] callback + /// callback function when the message is received + /// + /// \param[in] msg_selection + /// selected msg type to listen. Will listen to all msg if nullopt + /// + static std::shared_ptr make( + const int port, + ApiMessageCallback callback, + std::optional msg_selection = std::nullopt); + + /// Start Server + void start(); + + /// Stop Server + void stop(); + + /// Get the string name of the ApiMsgType + static const std::string to_string(const ApiMsgType& type); + + class Implementation; + +private: + BroadcastServer(); + rmf_utils::unique_impl_ptr _pimpl; +}; + +} // namespace rmf_websocket + +#endif // RMF_WEBSOCKET__BROADCAST_SERVER_HPP diff --git a/rmf_websocket/package.xml b/rmf_websocket/package.xml new file mode 100644 index 000000000..f35f34ac4 --- /dev/null +++ b/rmf_websocket/package.xml @@ -0,0 +1,28 @@ + + + + rmf_websocket + 2.0.0 + A package managing the websocket api endpoints in RMF system. + youliang + Marco A. GutiƩrrez + Apache License 2.0 + + ament_cmake + + rmf_utils + rclcpp + + libwebsocketpp-dev + nlohmann-json-dev + nlohmann_json_schema_validator_vendor + + eigen + + ament_cmake_catch2 + ament_cmake_uncrustify + + + ament_cmake + + diff --git a/rmf_websocket/src/rmf_websocket/BroadcastClient.cpp b/rmf_websocket/src/rmf_websocket/BroadcastClient.cpp new file mode 100644 index 000000000..62148d2a9 --- /dev/null +++ b/rmf_websocket/src/rmf_websocket/BroadcastClient.cpp @@ -0,0 +1,233 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include +#include +#include + +namespace rmf_websocket { + +//============================================================================== +class BroadcastClient::Implementation +{ +public: + using WebsocketClient = + websocketpp::client; + using WebsocketMessagePtr = WebsocketClient::message_ptr; + using ConnectionHDL = websocketpp::connection_hdl; + using Connections = std::set>; + + Implementation( + const std::string& uri, + const std::shared_ptr& node, + ProvideJsonUpdates get_json_updates_cb) + : _uri{std::move(uri)}, + _node{std::move(node)}, + _get_json_updates_cb{std::move(get_json_updates_cb)} + { + _shutdown = false; + _connected = false; + + // Initialize the Asio transport policy + _client.clear_access_channels(websocketpp::log::alevel::all); + _client.clear_error_channels(websocketpp::log::elevel::all); + _client.init_asio(); + _client.start_perpetual(); + _client_thread = std::thread( + [c = this]() + { + c->_client.run(); + }); + + _client.set_open_handler( + [c = this](websocketpp::connection_hdl) + { + c->_connected = true; + + if (c->_get_json_updates_cb) + c->publish(c->_get_json_updates_cb()); + + RCLCPP_INFO( + c->_node->get_logger(), + "BroadcastClient successfully connected to uri: [%s]", + c->_uri.c_str()); + }); + + _client.set_close_handler( + [c = this](websocketpp::connection_hdl) + { + c->_connected = false; + }); + + _client.set_fail_handler( + [c = this](websocketpp::connection_hdl) + { + c->_connected = false; + }); + + _processing_thread = std::thread( + [c = this]() + { + while (!c->_shutdown) + { + // Try to connect to the server if we are not connected yet + if (!c->_connected) + { + websocketpp::lib::error_code ec; + WebsocketClient::connection_ptr con = c->_client.get_connection( + c->_uri, ec); + + if (con) + { + c->_hdl = con->get_handle(); + c->_client.connect(con); + // TOD(YV): Without sending a test payload, ec seems to be 0 even + // when the client has not connected. Avoid sending this message. + c->_client.send(c->_hdl, "Hello", + websocketpp::frame::opcode::text, + ec); + } + + if (!con || ec) + { + RCLCPP_WARN( + c->_node->get_logger(), + "BroadcastClient unable to connect to [%s]. Please make sure " + "server is running. Error msg: %s", + c->_uri.c_str(), + ec.message().c_str()); + c->_connected = false; + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + continue; + } + + RCLCPP_INFO( + c->_node->get_logger(), + "BroadcastClient successfully connected to [%s]", + c->_uri.c_str()); + c->_connected = true; + } + + std::unique_lock lock(c->_wait_mutex); + c->_cv.wait(lock, + [c]() + { + return !c->_queue.empty(); + }); + + while (!c->_queue.empty()) + { + std::lock_guard lock(c->_queue_mutex); + websocketpp::lib::error_code ec; + const std::string& msg = c->_queue.front().dump(); + c->_client.send(c->_hdl, msg, websocketpp::frame::opcode::text, ec); + if (ec) + { + RCLCPP_ERROR( + c->_node->get_logger(), + "BroadcastClient unable to publish message: %s", + ec.message().c_str()); + // TODO(YV): Check if we should re-connect to server + break; + } + c->_queue.pop(); + } + } + }); + } + + // Publish a single message + void publish(const nlohmann::json& msg) + { + std::lock_guard lock(_queue_mutex); + _queue.push(msg); + _cv.notify_all(); + } + + // Publish a vector of messages + void publish(const std::vector& msgs) + { + std::lock_guard lock(_queue_mutex); + for (const auto& msg : msgs) + _queue.push(msg); + _cv.notify_all(); + } + + ~Implementation() + { + _shutdown = true; + if (_processing_thread.joinable()) + { + _processing_thread.join(); + } + if (_client_thread.joinable()) + { + _client_thread.join(); + } + _client.stop_perpetual(); + } + +private: + + // create pimpl + std::string _uri; + std::shared_ptr _node; + WebsocketClient _client; + websocketpp::connection_hdl _hdl; + std::mutex _wait_mutex; + std::mutex _queue_mutex; + std::condition_variable _cv; + std::queue _queue; + std::thread _processing_thread; + std::thread _client_thread; + std::atomic_bool _connected; + std::atomic_bool _shutdown; + ProvideJsonUpdates _get_json_updates_cb; +}; + +//============================================================================== +std::shared_ptr BroadcastClient::make( + const std::string& uri, + const std::shared_ptr& node, + ProvideJsonUpdates on_open_connection_fn) +{ + auto client = std::shared_ptr(new BroadcastClient()); + client->_pimpl = + rmf_utils::make_unique_impl( + uri, node, on_open_connection_fn); + return client; +} + +//============================================================================== +void BroadcastClient::publish(const nlohmann::json& msg) +{ + _pimpl->publish(msg); +} + +//============================================================================== +void BroadcastClient::publish(const std::vector& msgs) +{ + _pimpl->publish(msgs); +} + +//============================================================================== +BroadcastClient::BroadcastClient() +{ + // Do nothing +} + +} // namespace rmf_websocket diff --git a/rmf_fleet_adapter/test/mock/MockWebSocketServer.hpp b/rmf_websocket/src/rmf_websocket/BroadcastServer.cpp similarity index 67% rename from rmf_fleet_adapter/test/mock/MockWebSocketServer.hpp rename to rmf_websocket/src/rmf_websocket/BroadcastServer.cpp index 1f4b4ef0a..ff8d23f67 100644 --- a/rmf_fleet_adapter/test/mock/MockWebSocketServer.hpp +++ b/rmf_websocket/src/rmf_websocket/BroadcastServer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2020 Open Source Robotics Foundation + * Copyright (C) 2022 Open Source Robotics Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,57 +15,32 @@ * */ -#ifndef RMF_FLEET_ADAPTER__TEST__MOCK__WEBSOCKETSERVER -#define RMF_FLEET_ADAPTER__TEST__MOCK__WEBSOCKETSERVER +#include #include #include #include - #include #include #include #include -using Server = websocketpp::server; - -namespace rmf_fleet_adapter_test { +namespace rmf_websocket { //============================================================================== -/// This MockWebSocketServer is a wrapper of a websocket server. User need to -/// specify the api_msg_type, and provide a callback function. The provided -/// callback will be called when the specified msg_type is received. Note that -/// this will spawn a seperate thread to host the web socket server -class MockWebSocketServer +class BroadcastServer::Implementation { public: - enum class ApiMsgType - { - TaskStateUpdate, - TaskLogUpdate, - FleetStateUpdate, - FleetLogUpdate, - }; - + using Server = websocketpp::server; using ApiMessageCallback = std::function; - /// Add a callback to convert from a Description into an active Task. - /// - /// \param[in] port - /// server url port number - /// - /// \param[in] callback - /// callback function when the message is received - /// - /// \param[in] msg_selection - /// selected msg type to listen. Will listen to all msg if nullopt - /// - MockWebSocketServer( + Implementation( const int port, ApiMessageCallback callback, - std::optional msg_selection = std::nullopt) - : _data(std::make_shared(std::move(callback), std::move(msg_selection))) + std::optional msg_selection) + : _data(std::make_shared(std::move(callback), + std::move(msg_selection))) { std::cout << "Run websocket server with port " << port << std::endl; try @@ -86,7 +61,6 @@ class MockWebSocketServer data->on_message(hdl, msg); }); - _data->echo_server.listen(port); _data->echo_server.start_accept(); } @@ -103,7 +77,7 @@ class MockWebSocketServer /// Start Server void start() { - std::cout << "Start Mock Server" << std::endl; + std::cout << "Start BroadcastServer" << std::endl; // Start the ASIO io_service run loop _server_thread = std::thread( [data = _data]() { data->echo_server.run(); }); @@ -112,7 +86,7 @@ class MockWebSocketServer /// Stop Server void stop() { - std::cout << "Stop Mock Server" << std::endl; + std::cout << "Stop BroadcastServer" << std::endl; if (_server_thread.joinable()) { _data->echo_server.stop_listening(); @@ -122,7 +96,7 @@ class MockWebSocketServer } } - ~MockWebSocketServer() + ~Implementation() { stop(); } @@ -165,9 +139,12 @@ class MockWebSocketServer if (selection) { const auto target_msg_type = to_string(*selection); - const auto type = msg_json.at("type"); - if (type == target_msg_type) - msg_callback(msg_json.at("data")); + const auto type_it = msg_json.find("type"); + if (type_it != msg_json.end()) + { + if (type_it.value() == target_msg_type) + msg_callback(msg_json.at("data")); + } } else msg_callback(msg_json); @@ -178,7 +155,41 @@ class MockWebSocketServer std::thread _server_thread; }; +//============================================================================== +std::shared_ptr BroadcastServer::make( + const int port, + ApiMessageCallback callback, + std::optional msg_selection) +{ + auto server = std::shared_ptr(new BroadcastServer()); + server->_pimpl = + rmf_utils::make_unique_impl( + port, callback, msg_selection); + return server; +} -} // namespace rmf_fleet_adapter_test +//============================================================================== +void BroadcastServer::start() +{ + _pimpl->start(); +} + +//============================================================================== +void BroadcastServer::stop() +{ + _pimpl->stop(); +} + +//============================================================================== +const std::string BroadcastServer::to_string(const ApiMsgType& type) +{ + return BroadcastServer::Implementation::to_string(type); +} + +//============================================================================== +BroadcastServer::BroadcastServer() +{ + // Do nothing +} -#endif // RMF_FLEET_ADAPTER__TEST__MOCK__WEBSOCKETSERVER +} // namespace rmf_websocket