From 16f7d378780f6a054de29c2b04bcfe4bd4f241cb Mon Sep 17 00:00:00 2001 From: Giang Nguyen Date: Sun, 16 Jul 2023 22:28:43 +0200 Subject: [PATCH] Add server socket --- .../include/multiverse_client.h | 8 + .../multiverse_client/multiverse_ros_base.py | 4 +- .../multiverse_services/query_data_service.py | 5 +- .../multiverse_client/multiverse_client.cpp | 169 +++++++++++++++--- .../multiverse_client/multiverse_socket.cpp | 9 +- .../launch/multiverse_server.launch | 2 +- .../src/multiverse_server.cpp | 77 +++++--- 7 files changed, 216 insertions(+), 58 deletions(-) diff --git a/multiverse_ws/src/multiverse_core/multiverse_client/include/multiverse_client.h b/multiverse_ws/src/multiverse_core/multiverse_client/include/multiverse_client.h index c356f8696..ca87d9ccb 100644 --- a/multiverse_ws/src/multiverse_core/multiverse_client/include/multiverse_client.h +++ b/multiverse_ws/src/multiverse_core/multiverse_client/include/multiverse_client.h @@ -21,9 +21,11 @@ #pragma once #include +#include enum class EMultiverseClientState : unsigned char { + None, StartConnection, BindSendMetaData, SendMetaData, @@ -128,6 +130,8 @@ class MultiverseClient virtual void clean_up() = 0; private: + bool connect_to_server(); + void run(); void send_meta_data(); @@ -139,6 +143,8 @@ class MultiverseClient void init_buffer(); protected: + std::string server_socket_addr = "tcp://127.0.0.1:7000"; + std::string host; std::string port; @@ -160,6 +166,8 @@ class MultiverseClient std::string receive_meta_data_str; private: + std::thread connect_to_server_thread; + std::string socket_addr; EMultiverseClientState flag; diff --git a/multiverse_ws/src/multiverse_core/multiverse_client/scripts/multiverse_client/multiverse_ros_base.py b/multiverse_ws/src/multiverse_core/multiverse_client/scripts/multiverse_client/multiverse_ros_base.py index 75de18208..83815a64b 100644 --- a/multiverse_ws/src/multiverse_core/multiverse_client/scripts/multiverse_client/multiverse_ros_base.py +++ b/multiverse_ws/src/multiverse_core/multiverse_client/scripts/multiverse_client/multiverse_ros_base.py @@ -24,7 +24,7 @@ def start(self) -> None: pass def _init_multiverse_socket(self): - self.__multiverse_socket = MultiverseSocket(self.use_thread) + self.__multiverse_socket = MultiverseSocket(self.use_thread, 'tcp://127.0.0.1:7000') def _init_send_meta_data(self) -> None: self._send_meta_data_dict = init_send_meta_data_dict() @@ -38,7 +38,7 @@ def _disconnect(self) -> None: def _set_send_meta_data(self): self.__multiverse_socket.set_send_meta_data(self._send_meta_data_dict) - def _get_receive_meta_data(self, time_out=1) -> bool: + def _get_receive_meta_data(self, time_out=float('inf')) -> bool: start = rospy.Time.now() while not rospy.is_shutdown(): self._receive_meta_data_dict = self.__multiverse_socket.get_receive_meta_data() diff --git a/multiverse_ws/src/multiverse_core/multiverse_client/scripts/multiverse_client/multiverse_services/query_data_service.py b/multiverse_ws/src/multiverse_core/multiverse_client/scripts/multiverse_client/multiverse_services/query_data_service.py index 7d691a7aa..7511fbabc 100644 --- a/multiverse_ws/src/multiverse_core/multiverse_client/scripts/multiverse_client/multiverse_services/query_data_service.py +++ b/multiverse_ws/src/multiverse_core/multiverse_client/scripts/multiverse_client/multiverse_services/query_data_service.py @@ -16,9 +16,9 @@ def __init__(self, **kwargs) -> None: def update_world(self) -> None: super()._init_send_meta_data() - self._send_meta_data_dict["receive"][""] = ["position", "quaternion"] + self._send_meta_data_dict["receive"][""] = [""] self._set_send_meta_data() - self._connect() + self._communicate(True) if self._get_receive_meta_data(): world_name = self._receive_meta_data_dict["world"] self.__worlds[world_name] = {} @@ -28,7 +28,6 @@ def update_world(self) -> None: for attribute_name in object_data: self.__worlds[world_name][""].add(attribute_name) self.__worlds[world_name][object_name].add(attribute_name) - self._disconnect() def _bind_send_meta_data(self, request: SocketRequest) -> None: world_name = "world" if request.world == "" else request.world diff --git a/multiverse_ws/src/multiverse_core/multiverse_client/src/multiverse_client/multiverse_client.cpp b/multiverse_ws/src/multiverse_core/multiverse_client/src/multiverse_client/multiverse_client.cpp index 63a177ffc..3b34b3652 100644 --- a/multiverse_ws/src/multiverse_core/multiverse_client/src/multiverse_client/multiverse_client.cpp +++ b/multiverse_ws/src/multiverse_core/multiverse_client/src/multiverse_client/multiverse_client.cpp @@ -23,7 +23,7 @@ #include #include #include -#include +#include std::map attribute_map = { {"", 0}, @@ -37,14 +37,49 @@ std::map attribute_map = { {"force", 3}, {"torque", 3}}; +bool MultiverseClient::connect_to_server() +{ + zmq_disconnect(socket_client, socket_addr.c_str()); + + if (should_shut_down) + { + return false; + } + + zmq_connect(socket_client, server_socket_addr.c_str()); + + zmq_send(socket_client, socket_addr.c_str(), socket_addr.size(), 0); + + std::string receive_socket_addr; + try + { + zmq_msg_t message; + zmq_msg_init(&message); + zmq_msg_recv(&message, socket_client, 0); + receive_socket_addr = static_cast(zmq_msg_data(&message)), zmq_msg_size(&message); + zmq_msg_close(&message); + } + catch (const zmq::error_t &e) + { + should_shut_down = true; + printf("[Client] %s, prepares to disconnect from server socket %s.", e.what(), server_socket_addr.c_str()); + } + + zmq_disconnect(socket_client, server_socket_addr.c_str()); + + return socket_addr.compare(receive_socket_addr) == 0; +} + void MultiverseClient::connect(const std::string &in_host, const std::string &in_port) { - clean_up(); + flag = EMultiverseClientState::None; host = in_host; port = in_port; socket_addr = host + ":" + port; + clean_up(); + if (!init_objects()) { return; @@ -53,11 +88,23 @@ void MultiverseClient::connect(const std::string &in_host, const std::string &in context = zmq_ctx_new(); socket_client = zmq_socket(context, ZMQ_REQ); - flag = EMultiverseClientState::StartConnection; + if (connect_to_server_thread.joinable()) + { + connect_to_server_thread.join(); + } + + connect_to_server_thread = std::thread([&]() + { + if (!connect_to_server()) + { + return; + } + + flag = EMultiverseClientState::StartConnection; - printf("[Client %s] Opened the socket %s.\n", port.c_str(), socket_addr.c_str()); + printf("[Client %s] Opened the socket %s.\n", port.c_str(), socket_addr.c_str()); - run(); + run(); }); } double MultiverseClient::get_time_now() @@ -86,8 +133,6 @@ void MultiverseClient::run() printf("[Client %s] Sending meta data to the server:\n%s", port.c_str(), send_meta_data_str.c_str()); start_meta_data_thread(); - - flag = EMultiverseClientState::BindReceiveMetaData; return; case EMultiverseClientState::SendMetaData: @@ -99,22 +144,43 @@ void MultiverseClient::run() case EMultiverseClientState::ReceiveMetaData: receive_meta_data(); - if (receive_meta_data_str.empty() || !reader.parse(receive_meta_data_str, receive_meta_data_json)) + if (receive_meta_data_str.empty() || + !reader.parse(receive_meta_data_str, receive_meta_data_json) || + !receive_meta_data_json.isMember("time") || + receive_meta_data_json["time"].asDouble() < 0) { printf("[Client %s] The socket %s from the server has been terminated, resending the meta data.\n", port.c_str(), socket_addr.c_str()); - zmq_disconnect(socket_client, socket_addr.c_str()); - zmq_connect(socket_client, socket_addr.c_str()); - flag = EMultiverseClientState::SendMetaData; + + zmq_sleep(1); // Wait for the server to terminate completely + + if (connect_to_server()) + { + zmq_connect(socket_client, socket_addr.c_str()); + flag = EMultiverseClientState::SendMetaData; + } + else + { + flag = EMultiverseClientState::None; + } } else if (check_buffer_size()) { init_buffer(); flag = EMultiverseClientState::BindReceiveMetaData; - break; } else { - flag = EMultiverseClientState::StartConnection; + zmq_sleep(1); // Wait for the server to terminate completely + + if (connect_to_server()) + { + zmq_connect(socket_client, socket_addr.c_str()); + flag = EMultiverseClientState::SendMetaData; + } + else + { + flag = EMultiverseClientState::None; + } } break; @@ -133,7 +199,7 @@ void MultiverseClient::run() flag = EMultiverseClientState::BindSendData; break; - case EMultiverseClientState::BindSendData: + case EMultiverseClientState::BindSendData: bind_send_data(); flag = EMultiverseClientState::SendData; @@ -152,7 +218,28 @@ void MultiverseClient::run() if (std::isnan(*receive_buffer) || *receive_buffer < 0) { printf("[Client %s] The socket %s from the server has been terminated, returning to resend the meta data.\n", port.c_str(), socket_addr.c_str()); - flag = EMultiverseClientState::StartConnection; + + if (connect_to_server_thread.joinable()) + { + connect_to_server_thread.join(); + } + + connect_to_server_thread = std::thread([&]() + { + zmq_sleep(1); // Wait for the server to terminate completely + + if (!connect_to_server()) + { + return; + } + + flag = EMultiverseClientState::StartConnection; + + wait_for_meta_data_thread_finish(); + + run(); }); + + return; } else { @@ -165,6 +252,9 @@ void MultiverseClient::run() flag = EMultiverseClientState::BindSendData; return; + + default: + return; } } @@ -218,13 +308,25 @@ void MultiverseClient::receive_meta_data() bool MultiverseClient::check_buffer_size() { + bool skip_compare = false; + std::map request_buffer_sizes = {{"send", 1}, {"receive", 1}}; for (std::pair &request_buffer_size : request_buffer_sizes) { for (const std::string &object_name : send_meta_data_json[request_buffer_size.first].getMemberNames()) { + if (strcmp(object_name.c_str(), "") == 0) + { + skip_compare = true; + break; + } for (const Json::Value &attribute : send_meta_data_json[request_buffer_size.first][object_name]) { + if (strcmp(attribute.asString().c_str(), "") == 0) + { + skip_compare = true; + break; + } request_buffer_size.second += attribute_map[attribute.asString()]; } } @@ -242,7 +344,7 @@ bool MultiverseClient::check_buffer_size() } } - if (!send_meta_data_json["receive"].isMember("") && + if (!skip_compare && (response_buffer_sizes["send"] != request_buffer_sizes["send"] || response_buffer_sizes["receive"] != request_buffer_sizes["receive"])) { printf("[Client %s] Failed to initialize the buffers %s: send_buffer_size(server = %ld, client = %ld), receive_buffer_size(server = %ld, client = %ld).\n", @@ -268,19 +370,33 @@ void MultiverseClient::init_buffer() void MultiverseClient::communicate(const bool resend_meta_data) { - if (resend_meta_data && flag == EMultiverseClientState::BindSendData) + if (should_shut_down) { - clean_up(); - flag = EMultiverseClientState::BindSendMetaData; - run(); + return; } - else if (!resend_meta_data && flag == EMultiverseClientState::BindSendData) + + if (resend_meta_data) { - run(); + if (flag == EMultiverseClientState::BindSendData) + { + clean_up(); + flag = EMultiverseClientState::BindSendMetaData; + run(); + } + else if (flag == EMultiverseClientState::InitSendAndReceiveData) + { + wait_for_meta_data_thread_finish(); + clean_up(); + flag = EMultiverseClientState::BindSendMetaData; + run(); + } } - else if (flag == EMultiverseClientState::InitSendAndReceiveData) + else { - run(); + if (flag == EMultiverseClientState::BindSendData || flag == EMultiverseClientState::InitSendAndReceiveData) + { + run(); + } } } @@ -291,4 +407,9 @@ void MultiverseClient::disconnect() zmq_ctx_shutdown(context); wait_for_meta_data_thread_finish(); + + if (connect_to_server_thread.joinable()) + { + connect_to_server_thread.join(); + } } \ No newline at end of file diff --git a/multiverse_ws/src/multiverse_core/multiverse_client/src/multiverse_client/multiverse_socket.cpp b/multiverse_ws/src/multiverse_core/multiverse_client/src/multiverse_client/multiverse_socket.cpp index 3e9136ac7..556110a9c 100644 --- a/multiverse_ws/src/multiverse_core/multiverse_client/src/multiverse_client/multiverse_socket.cpp +++ b/multiverse_ws/src/multiverse_core/multiverse_client/src/multiverse_client/multiverse_socket.cpp @@ -20,8 +20,6 @@ #include "multiverse_client.h" -#include - #include #include #include @@ -29,8 +27,9 @@ class MultiverseSocket final : public MultiverseClient { public: - MultiverseSocket(const bool use_thread) : use_thread(use_thread) + MultiverseSocket(const bool use_thread, const std::string &in_server_socket_addr = "tcp:127.0.0.1:7000") : use_thread(use_thread) { + server_socket_addr = in_server_socket_addr; } ~MultiverseSocket() @@ -105,6 +104,7 @@ class MultiverseSocket final : public MultiverseClient void bind_send_meta_data() override { + pybind11::gil_scoped_acquire acquire; send_meta_data_json.clear(); send_meta_data_json["world"] = send_meta_data_dict.contains("world") ? send_meta_data_dict["world"].cast() : "world"; @@ -129,6 +129,7 @@ class MultiverseSocket final : public MultiverseClient } } } + pybind11::gil_scoped_release release; } void bind_receive_meta_data() override @@ -226,7 +227,7 @@ PYBIND11_MODULE(multiverse_socket, handle) .def("disconnect", &MultiverseClient::disconnect); pybind11::class_(handle, "MultiverseSocket", pybind11::is_final()) - .def(pybind11::init()) + .def(pybind11::init()) .def("set_send_meta_data", &MultiverseSocket::set_send_meta_data) .def("get_receive_meta_data", &MultiverseSocket::get_receive_meta_data) .def("set_send_data", &MultiverseSocket::set_send_data) diff --git a/multiverse_ws/src/multiverse_core/multiverse_server/launch/multiverse_server.launch b/multiverse_ws/src/multiverse_core/multiverse_server/launch/multiverse_server.launch index 27e4e1092..c444ebc9a 100644 --- a/multiverse_ws/src/multiverse_core/multiverse_server/launch/multiverse_server.launch +++ b/multiverse_ws/src/multiverse_core/multiverse_server/launch/multiverse_server.launch @@ -1,6 +1,6 @@ + args="tcp://127.0.0.1:7000" /> \ No newline at end of file diff --git a/multiverse_ws/src/multiverse_core/multiverse_server/src/multiverse_server.cpp b/multiverse_ws/src/multiverse_core/multiverse_server/src/multiverse_server.cpp index 9319fe08a..5b01a23ef 100644 --- a/multiverse_ws/src/multiverse_core/multiverse_server/src/multiverse_server.cpp +++ b/multiverse_ws/src/multiverse_core/multiverse_server/src/multiverse_server.cpp @@ -108,8 +108,6 @@ std::map>> handedness_scal {{"rhs", {1.0, 1.0, 1.0}}, {"lhs", {1.0, -1.0, 1.0}}}}}; -std::vector workers; - std::mutex mtx; std::map, bool>>>> worlds; @@ -132,15 +130,15 @@ class MultiverseServer public: MultiverseServer(const std::string &socket_addr) : socket_addr(socket_addr) { - socket_server = zmq::socket_t(context, zmq::socket_type::rep); - socket_server.bind(socket_addr); + socket = zmq::socket_t(context, zmq::socket_type::rep); + socket.bind(socket_addr); sockets_need_clean_up[socket_addr] = false; - ROS_INFO("[Server] Bind to socket [%s].", socket_addr.c_str()); + ROS_INFO("[Server] Bind to socket %s.", socket_addr.c_str()); } ~MultiverseServer() { - ROS_INFO("[Server] Close socket [%s].", socket_addr.c_str()); + ROS_INFO("[Server] Close socket %s.", socket_addr.c_str()); if (send_buffer != nullptr) { @@ -282,7 +280,7 @@ class MultiverseServer } ROS_INFO("[Server] Unbind from socket %s.", socket_addr.c_str()); - socket_server.unbind(socket_addr); + socket.unbind(socket_addr); } } @@ -301,7 +299,7 @@ class MultiverseServer try { sockets_need_clean_up[socket_addr] = false; - socket_server.recv(message, zmq::recv_flags::none); + socket.recv(message, zmq::recv_flags::none); sockets_need_clean_up[socket_addr] = true; } catch (const zmq::error_t &e) @@ -585,7 +583,7 @@ class MultiverseServer // Send buffer sizes and send_data (if exists) over ZMQ zmq::message_t response_message(message_str.size()); memcpy(response_message.data(), message_str.data(), message_str.size()); - socket_server.send(response_message, zmq::send_flags::none); + socket.send(response_message, zmq::send_flags::none); send_buffer = (double *)calloc(send_buffer_size, sizeof(double)); receive_buffer = (double *)calloc(receive_buffer_size, sizeof(double)); @@ -597,14 +595,14 @@ class MultiverseServer try { sockets_need_clean_up[socket_addr] = false; - socket_server.recv(message, zmq::recv_flags::none); + socket.recv(message, zmq::recv_flags::none); sockets_need_clean_up[socket_addr] = true; memcpy(send_buffer, message.data(), send_buffer_size * sizeof(double)); } catch (const zmq::error_t &e) { should_shut_down = true; - ROS_INFO("[Server] %s, socket at [%s] prepares to close.", e.what(), socket_addr.c_str()); + ROS_INFO("[Server] %s, socket at %s prepares to close.", e.what(), socket_addr.c_str()); } } @@ -632,7 +630,7 @@ class MultiverseServer const int now = get_time_now(); if (now - start > 1) { - ROS_INFO("[Server] Socket at [%s] is waiting for data of [%s][%s] to be sent.", socket_addr.c_str(), object_name.c_str(), attribute_name.c_str()); + ROS_INFO("[Server] Socket at %s is waiting for data of [%s][%s] to be sent.", socket_addr.c_str(), object_name.c_str(), attribute_name.c_str()); start = now; } } @@ -676,7 +674,7 @@ class MultiverseServer receive_buffer[0] = should_shut_down ? -1.0 : get_time_now(); zmq::message_t reply_data(receive_buffer_size * sizeof(double)); memcpy(reply_data.data(), receive_buffer, receive_buffer_size * sizeof(double)); - socket_server.send(reply_data, zmq::send_flags::none); + socket.send(reply_data, zmq::send_flags::none); } private: @@ -688,7 +686,7 @@ class MultiverseServer std::string socket_addr; - zmq::socket_t socket_server; + zmq::socket_t socket; Json::Value send_meta_data_json; @@ -721,10 +719,44 @@ class MultiverseServer bool continue_state = false; }; -void start_multiverse_server(int port) +void start_multiverse_server(const std::string &server_socket_addr) { - MultiverseServer multiverse_server("tcp://127.0.0.1:" + std::to_string(port)); - multiverse_server.start(); + std::map workers; + zmq::socket_t socket = zmq::socket_t(context, zmq::socket_type::rep); + socket.bind(server_socket_addr); + ROS_INFO("[Server] Create server socket %s.", server_socket_addr.c_str()); + + zmq::message_t message; + std::string message_str; + while (!should_shut_down) + { + try + { + socket.recv(message, zmq::recv_flags::none); + message_str = message.to_string(); + } + catch (const zmq::error_t &e) + { + should_shut_down = true; + ROS_INFO("[Server] %s, server socket %s prepares to close.", e.what(), server_socket_addr.c_str()); + break; + } + + if (workers.count(message_str) == 0) + { + workers[message_str] = std::thread([&]() + { MultiverseServer multiverse_server(message_str); multiverse_server.start(); }); + } + + socket.send(message, zmq::send_flags::none); + + zmq_sleep(0.1); + } + + for (std::pair &worker : workers) + { + worker.second.join(); + } } int main(int argc, char **argv) @@ -735,12 +767,9 @@ int main(int argc, char **argv) ROS_INFO("[Server] Interrupt signal (%d) received, wait for 1s then shutdown.", signum); should_shut_down = true; }); - ros::init(argc, argv, "state_server"); + ros::init(argc, argv, "multiverse_server"); - for (size_t thread_num = 0; thread_num < argc - 1; thread_num++) - { - workers.emplace_back(start_multiverse_server, std::stoi(argv[thread_num + 1])); - } + std::thread multiverse_server_thread(start_multiverse_server, std::string(argv[1])); while (!should_shut_down) { @@ -764,8 +793,8 @@ int main(int argc, char **argv) context.shutdown(); - for (std::thread &worker : workers) + if (multiverse_server_thread.joinable()) { - worker.join(); + multiverse_server_thread.join(); } }