Skip to content

Commit

Permalink
Add server socket
Browse files Browse the repository at this point in the history
  • Loading branch information
HoangGiang93 committed Jul 16, 2023
1 parent 5e4c112 commit 16f7d37
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
#pragma once

#include <jsoncpp/json/json.h>
#include <thread>

enum class EMultiverseClientState : unsigned char
{
None,
StartConnection,
BindSendMetaData,
SendMetaData,
Expand Down Expand Up @@ -128,6 +130,8 @@ class MultiverseClient
virtual void clean_up() = 0;

private:
bool connect_to_server();

void run();

void send_meta_data();
Expand All @@ -139,6 +143,8 @@ class MultiverseClient
void init_buffer();

protected:
std::string server_socket_addr = "tcp://127.0.0.1:7000";

std::string host;

std::string port;
Expand All @@ -160,6 +166,8 @@ class MultiverseClient
std::string receive_meta_data_str;

private:
std::thread connect_to_server_thread;

std::string socket_addr;

EMultiverseClientState flag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def start(self) -> None:
pass

def _init_multiverse_socket(self):
self.__multiverse_socket = MultiverseSocket(self.use_thread)
self.__multiverse_socket = MultiverseSocket(self.use_thread, 'tcp://127.0.0.1:7000')

def _init_send_meta_data(self) -> None:
self._send_meta_data_dict = init_send_meta_data_dict()
Expand All @@ -38,7 +38,7 @@ def _disconnect(self) -> None:
def _set_send_meta_data(self):
self.__multiverse_socket.set_send_meta_data(self._send_meta_data_dict)

def _get_receive_meta_data(self, time_out=1) -> bool:
def _get_receive_meta_data(self, time_out=float('inf')) -> bool:
start = rospy.Time.now()
while not rospy.is_shutdown():
self._receive_meta_data_dict = self.__multiverse_socket.get_receive_meta_data()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ def __init__(self, **kwargs) -> None:

def update_world(self) -> None:
super()._init_send_meta_data()
self._send_meta_data_dict["receive"][""] = ["position", "quaternion"]
self._send_meta_data_dict["receive"][""] = [""]
self._set_send_meta_data()
self._connect()
self._communicate(True)
if self._get_receive_meta_data():
world_name = self._receive_meta_data_dict["world"]
self.__worlds[world_name] = {}
Expand All @@ -28,7 +28,6 @@ def update_world(self) -> None:
for attribute_name in object_data:
self.__worlds[world_name][""].add(attribute_name)
self.__worlds[world_name][object_name].add(attribute_name)
self._disconnect()

def _bind_send_meta_data(self, request: SocketRequest) -> None:
world_name = "world" if request.world == "" else request.world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <algorithm>
#include <chrono>
#include <cmath>
#include <zmq.h>
#include <zmq.hpp>

std::map<std::string, size_t> attribute_map = {
{"", 0},
Expand All @@ -37,14 +37,49 @@ std::map<std::string, size_t> attribute_map = {
{"force", 3},
{"torque", 3}};

bool MultiverseClient::connect_to_server()
{
zmq_disconnect(socket_client, socket_addr.c_str());

if (should_shut_down)
{
return false;
}

zmq_connect(socket_client, server_socket_addr.c_str());

zmq_send(socket_client, socket_addr.c_str(), socket_addr.size(), 0);

std::string receive_socket_addr;
try
{
zmq_msg_t message;
zmq_msg_init(&message);
zmq_msg_recv(&message, socket_client, 0);
receive_socket_addr = static_cast<char *>(zmq_msg_data(&message)), zmq_msg_size(&message);
zmq_msg_close(&message);
}
catch (const zmq::error_t &e)
{
should_shut_down = true;
printf("[Client] %s, prepares to disconnect from server socket %s.", e.what(), server_socket_addr.c_str());
}

zmq_disconnect(socket_client, server_socket_addr.c_str());

return socket_addr.compare(receive_socket_addr) == 0;
}

void MultiverseClient::connect(const std::string &in_host, const std::string &in_port)
{
clean_up();
flag = EMultiverseClientState::None;

host = in_host;
port = in_port;
socket_addr = host + ":" + port;

clean_up();

if (!init_objects())
{
return;
Expand All @@ -53,11 +88,23 @@ void MultiverseClient::connect(const std::string &in_host, const std::string &in
context = zmq_ctx_new();
socket_client = zmq_socket(context, ZMQ_REQ);

flag = EMultiverseClientState::StartConnection;
if (connect_to_server_thread.joinable())
{
connect_to_server_thread.join();
}

connect_to_server_thread = std::thread([&]()
{
if (!connect_to_server())
{
return;
}

flag = EMultiverseClientState::StartConnection;

printf("[Client %s] Opened the socket %s.\n", port.c_str(), socket_addr.c_str());
printf("[Client %s] Opened the socket %s.\n", port.c_str(), socket_addr.c_str());

run();
run(); });
}

double MultiverseClient::get_time_now()
Expand Down Expand Up @@ -86,8 +133,6 @@ void MultiverseClient::run()
printf("[Client %s] Sending meta data to the server:\n%s", port.c_str(), send_meta_data_str.c_str());

start_meta_data_thread();

flag = EMultiverseClientState::BindReceiveMetaData;
return;

case EMultiverseClientState::SendMetaData:
Expand All @@ -99,22 +144,43 @@ void MultiverseClient::run()
case EMultiverseClientState::ReceiveMetaData:
receive_meta_data();

if (receive_meta_data_str.empty() || !reader.parse(receive_meta_data_str, receive_meta_data_json))
if (receive_meta_data_str.empty() ||
!reader.parse(receive_meta_data_str, receive_meta_data_json) ||
!receive_meta_data_json.isMember("time") ||
receive_meta_data_json["time"].asDouble() < 0)
{
printf("[Client %s] The socket %s from the server has been terminated, resending the meta data.\n", port.c_str(), socket_addr.c_str());
zmq_disconnect(socket_client, socket_addr.c_str());
zmq_connect(socket_client, socket_addr.c_str());
flag = EMultiverseClientState::SendMetaData;

zmq_sleep(1); // Wait for the server to terminate completely

if (connect_to_server())
{
zmq_connect(socket_client, socket_addr.c_str());
flag = EMultiverseClientState::SendMetaData;
}
else
{
flag = EMultiverseClientState::None;
}
}
else if (check_buffer_size())
{
init_buffer();
flag = EMultiverseClientState::BindReceiveMetaData;
break;
}
else
{
flag = EMultiverseClientState::StartConnection;
zmq_sleep(1); // Wait for the server to terminate completely

if (connect_to_server())
{
zmq_connect(socket_client, socket_addr.c_str());
flag = EMultiverseClientState::SendMetaData;
}
else
{
flag = EMultiverseClientState::None;
}
}
break;

Expand All @@ -133,7 +199,7 @@ void MultiverseClient::run()
flag = EMultiverseClientState::BindSendData;
break;

case EMultiverseClientState::BindSendData:
case EMultiverseClientState::BindSendData:
bind_send_data();

flag = EMultiverseClientState::SendData;
Expand All @@ -152,7 +218,28 @@ void MultiverseClient::run()
if (std::isnan(*receive_buffer) || *receive_buffer < 0)
{
printf("[Client %s] The socket %s from the server has been terminated, returning to resend the meta data.\n", port.c_str(), socket_addr.c_str());
flag = EMultiverseClientState::StartConnection;

if (connect_to_server_thread.joinable())
{
connect_to_server_thread.join();
}

connect_to_server_thread = std::thread([&]()
{
zmq_sleep(1); // Wait for the server to terminate completely

if (!connect_to_server())
{
return;
}

flag = EMultiverseClientState::StartConnection;

wait_for_meta_data_thread_finish();

run(); });

return;
}
else
{
Expand All @@ -165,6 +252,9 @@ void MultiverseClient::run()

flag = EMultiverseClientState::BindSendData;
return;

default:
return;
}
}

Expand Down Expand Up @@ -218,13 +308,25 @@ void MultiverseClient::receive_meta_data()

bool MultiverseClient::check_buffer_size()
{
bool skip_compare = false;

std::map<std::string, size_t> request_buffer_sizes = {{"send", 1}, {"receive", 1}};
for (std::pair<const std::string, size_t> &request_buffer_size : request_buffer_sizes)
{
for (const std::string &object_name : send_meta_data_json[request_buffer_size.first].getMemberNames())
{
if (strcmp(object_name.c_str(), "") == 0)
{
skip_compare = true;
break;
}
for (const Json::Value &attribute : send_meta_data_json[request_buffer_size.first][object_name])
{
if (strcmp(attribute.asString().c_str(), "") == 0)
{
skip_compare = true;
break;
}
request_buffer_size.second += attribute_map[attribute.asString()];
}
}
Expand All @@ -242,7 +344,7 @@ bool MultiverseClient::check_buffer_size()
}
}

if (!send_meta_data_json["receive"].isMember("") &&
if (!skip_compare &&
(response_buffer_sizes["send"] != request_buffer_sizes["send"] || response_buffer_sizes["receive"] != request_buffer_sizes["receive"]))
{
printf("[Client %s] Failed to initialize the buffers %s: send_buffer_size(server = %ld, client = %ld), receive_buffer_size(server = %ld, client = %ld).\n",
Expand All @@ -268,19 +370,33 @@ void MultiverseClient::init_buffer()

void MultiverseClient::communicate(const bool resend_meta_data)
{
if (resend_meta_data && flag == EMultiverseClientState::BindSendData)
if (should_shut_down)
{
clean_up();
flag = EMultiverseClientState::BindSendMetaData;
run();
return;
}
else if (!resend_meta_data && flag == EMultiverseClientState::BindSendData)

if (resend_meta_data)
{
run();
if (flag == EMultiverseClientState::BindSendData)
{
clean_up();
flag = EMultiverseClientState::BindSendMetaData;
run();
}
else if (flag == EMultiverseClientState::InitSendAndReceiveData)
{
wait_for_meta_data_thread_finish();
clean_up();
flag = EMultiverseClientState::BindSendMetaData;
run();
}
}
else if (flag == EMultiverseClientState::InitSendAndReceiveData)
else
{
run();
if (flag == EMultiverseClientState::BindSendData || flag == EMultiverseClientState::InitSendAndReceiveData)
{
run();
}
}
}

Expand All @@ -291,4 +407,9 @@ void MultiverseClient::disconnect()
zmq_ctx_shutdown(context);

wait_for_meta_data_thread_finish();

if (connect_to_server_thread.joinable())
{
connect_to_server_thread.join();
}
}
Loading

0 comments on commit 16f7d37

Please sign in to comment.