Skip to content

Commit

Permalink
Make spawning possible
Browse files Browse the repository at this point in the history
  • Loading branch information
HoangGiang93 committed Jul 20, 2023
1 parent 5ceea0b commit 255bb49
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,9 @@
#pragma once

#include <string>
#include <atomic>

enum class EMultiverseClientState : unsigned char
{
None,
StartConnection,
BindRequestMetaData,
SendRequestMetaData,
ReceiveResponseMetaData,
BindResponseMetaData,
InitSendAndReceiveData,
BindSendData,
SendData,
ReceiveData,
BindReceiveData
};

enum class EMultiverseClientState : unsigned char;
class MultiverseClient
{
public:
Expand All @@ -56,30 +43,20 @@ class MultiverseClient
* @brief start the client
*
*/
void start(const bool wait_for_server = false);
void start();

/**
* @brief Communicate with the server
*
*/
void communicate(const bool resend_request_meta_data = false);
virtual void communicate(const bool resend_request_meta_data = false);

/**
* @brief Send close signal to the server
*
*/
void disconnect();

/**
* @brief Get the State
*
* @return EMultiverseClientState
*/
EMultiverseClientState GetState() const
{
return flag;
}

public:
/**
* @brief Get the current time in time_unit
Expand Down Expand Up @@ -223,11 +200,11 @@ class MultiverseClient

std::string response_meta_data_str;

std::atomic<EMultiverseClientState> flag;

private:
std::string socket_addr;

EMultiverseClientState flag;

void *context;

void *client_socket;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
#!/usr/bin/env python3

import rospy
from typing import List
from typing import List, Dict, Any
from multiverse_client.multiverse_ros_base import MultiverseRosBase


class MultiverseRosPublisher(MultiverseRosBase):
_use_meta_data = False

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.use_thread = True
self.rate = rospy.Rate(float(kwargs.get("rate", 60)))

def start(self) -> None:
self._init_multiverse_socket()
self._set_request_meta_data()
self._connect()

response_meta_data = self._get_response_meta_data()

self._construct_rosmsg(response_meta_data)

while not rospy.is_shutdown():
self._communicate()
receive_data = self._get_receive_data()
self._bind_rosmsg(receive_data)
if not rospy.is_shutdown():
self._publish()
self.rate.sleep()
if self._use_meta_data:
while not rospy.is_shutdown():
self._communicate(True)
self._construct_rosmsg(self._get_response_meta_data())
if not rospy.is_shutdown():
self._publish()
self.rate.sleep()

else:
self._construct_rosmsg(self._get_response_meta_data())

while not rospy.is_shutdown():
self._communicate()
self._bind_rosmsg(self._get_receive_data())
if not rospy.is_shutdown():
self._publish()
self.rate.sleep()

self._disconnect()

def _construct_rosmsg(self, response_meta_data_dict: dict) -> None:
def _construct_rosmsg(self, response_meta_data_dict: Dict) -> None:
pass

def _bind_rosmsg(self, receive_data: List[float]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
class tf_publisher(MultiverseRosPublisher):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._use_meta_data = True
self.__tf_broadcaster = TransformBroadcaster()
self.__refresh_time = rospy.Time.now()
self.__tf_msgs = []
self.__root_frame_id = rospy.get_param("multiverse/root_frame_id") if rospy.has_param("multiverse/root_frame_id") else "map"
self.__seq = 0

def _init_request_meta_data(self) -> None:
super()._init_request_meta_data()
Expand All @@ -22,34 +25,34 @@ def _construct_rosmsg(self, response_meta_data_dict: dict) -> None:
self.object_names = []
if response_meta_data_dict.get("receive") is None:
return

self.object_names = response_meta_data_dict["receive"].keys()
root_frame_id = rospy.get_param("multiverse/root_frame_id") if rospy.has_param("multiverse/root_frame_id") else "map"

self.__tf_msgs = []

for object_name in self.object_names:
tf_data = response_meta_data_dict["receive"][object_name]
tf_msg = TransformStamped()
tf_msg.header.frame_id = root_frame_id
tf_msg.header.frame_id = self.__root_frame_id
tf_msg.header.stamp = rospy.Time.now()
tf_msg.header.seq = self.__seq
tf_msg.child_frame_id = object_name

tf_msg.transform.translation.x = tf_data["position"][0]
tf_msg.transform.translation.y = tf_data["position"][1]
tf_msg.transform.translation.z = tf_data["position"][2]
quat = numpy.array([tf_data["quaternion"][0], tf_data["quaternion"][1], tf_data["quaternion"][2], tf_data["quaternion"][3]])
try:
quat = quat / numpy.linalg.norm(quat)
except TypeError as e:
rospy.logwarn(e)
continue
tf_msg.transform.rotation.w = quat[0]
tf_msg.transform.rotation.x = quat[1]
tf_msg.transform.rotation.y = quat[2]
tf_msg.transform.rotation.z = quat[3]
self.__tf_msgs.append(tf_msg)

def _bind_rosmsg(self, receive_data: List[float]) -> None:
for i, _ in enumerate(self.object_names):
self.__tf_msgs[i].header.stamp = rospy.Time.now()
self.__tf_msgs[i].header.seq += 1
self.__tf_msgs[i].transform.translation.x = receive_data[7 * i + 1]
self.__tf_msgs[i].transform.translation.y = receive_data[7 * i + 2]
self.__tf_msgs[i].transform.translation.z = receive_data[7 * i + 3]
quat = numpy.array([receive_data[7 * i + 4], receive_data[7 * i + 5], receive_data[7 * i + 6], receive_data[7 * i + 7]])
quat = quat / numpy.linalg.norm(quat)
self.__tf_msgs[i].transform.rotation.w = quat[0]
self.__tf_msgs[i].transform.rotation.x = quat[1]
self.__tf_msgs[i].transform.rotation.y = quat[2]
self.__tf_msgs[i].transform.rotation.z = quat[3]
self.__seq += 1

def _publish(self) -> None:
self.__tf_broadcaster.sendTransform(self.__tf_msgs)
# if rospy.Time.now().to_sec() - self.__refresh_time.to_sec() > 1:
# self._communicate(True)
# response_meta_data_dict = self._get_response_meta_data()
# self._construct_rosmsg(response_meta_data_dict)
# self.__refresh_time = rospy.Time.now()
self.__tf_broadcaster.sendTransform(self.__tf_msgs)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from multiverse_client.utils.multiverse_utils import init_request_meta_data_dict
import rospy
from typing import List
from typing import List, Dict
from multiverse_socket import MultiverseSocket # noqa


Expand All @@ -12,32 +12,32 @@ class MultiverseRosBase:
def __init__(self, **kwargs) -> None:
self.host = kwargs.get("host", "tcp://127.0.0.1")
self.port = str(kwargs.get("port"))
self.use_thread = True
self._init_request_meta_data()

def start(self) -> None:
pass

def _init_multiverse_socket(self):
self.__multiverse_socket = MultiverseSocket(
self.use_thread, "tcp://127.0.0.1:7000"
)
def _init_multiverse_socket(self) -> None:
self.__multiverse_socket = MultiverseSocket("tcp://127.0.0.1:7000")

def _init_request_meta_data(self) -> None:
self._request_meta_data_dict = init_request_meta_data_dict()

def _connect(self) -> None:
self.__multiverse_socket.connect(self.host, self.port)
self.__multiverse_socket.start(True)
self.__multiverse_socket.start()

def _disconnect(self) -> None:
self.__multiverse_socket.disconnect()

def _set_request_meta_data(self):
def _set_request_meta_data(self) -> None:
self.__multiverse_socket.set_request_meta_data(self._request_meta_data_dict)

def _get_response_meta_data(self) -> dict:
return self.__multiverse_socket.get_response_meta_data()
def _get_response_meta_data(self) -> Dict:
response_meta_data = self.__multiverse_socket.get_response_meta_data()
if not response_meta_data:
rospy.logwarn(f"[Client {self.port}] Receive empty response meta data.")
return response_meta_data

def _set_send_data(self, send_data: List[float]) -> None:
self.__multiverse_socket.set_send_data(send_data)
Expand All @@ -46,7 +46,10 @@ def _communicate(self, resend_request_meta_data: bool = False) -> None:
self.__multiverse_socket.communicate(resend_request_meta_data)

def _get_receive_data(self) -> List[float]:
return self.__multiverse_socket.get_receive_data()
receive_data = self.__multiverse_socket.get_receive_data()
if not receive_data:
rospy.logwarn(f"[Client {self.port}] Receive empty data.")
return receive_data

def _restart(self) -> None:
self._disconnect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@

#define STRING_SIZE 200

enum class EMultiverseClientState : unsigned char
{
None,
StartConnection,
BindRequestMetaData,
SendRequestMetaData,
ReceiveResponseMetaData,
BindResponseMetaData,
InitSendAndReceiveData,
BindSendData,
SendData,
ReceiveData,
BindReceiveData
};

void MultiverseClient::connect_to_server()
{
zmq_disconnect(client_socket, socket_addr.c_str());
Expand Down Expand Up @@ -65,10 +80,8 @@ void MultiverseClient::connect_to_server()
if (socket_addr.compare(receive_socket_addr) != 0)
{
flag = EMultiverseClientState::None;
return;
}

if (flag == EMultiverseClientState::None || flag == EMultiverseClientState::ReceiveData)
else if (flag == EMultiverseClientState::None || flag == EMultiverseClientState::ReceiveData)
{
flag = EMultiverseClientState::StartConnection;

Expand All @@ -91,13 +104,8 @@ void MultiverseClient::connect(const std::string &in_host, const std::string &in
connect();
}

void MultiverseClient::start(const bool wait_for_server)
void MultiverseClient::start()
{
if (wait_for_server)
{
wait_for_connect_to_server_thread_finish();
}

flag == EMultiverseClientState::StartConnection;
printf("[Client %s] Start.\n", port.c_str());
run();
Expand Down Expand Up @@ -144,9 +152,11 @@ void MultiverseClient::run()
case EMultiverseClientState::BindRequestMetaData:
bind_request_meta_data();

printf("[Client %s] Sent meta data to the server:%*.*s\n", port.c_str(), STRING_SIZE, STRING_SIZE, request_meta_data_str.c_str());
// printf("[Client %s] Sent meta data to the server:%*.*s\n", port.c_str(), STRING_SIZE, STRING_SIZE, request_meta_data_str.c_str());

wait_for_meta_data_thread_finish();
start_meta_data_thread();
flag = EMultiverseClientState::InitSendAndReceiveData;
return;

case EMultiverseClientState::SendRequestMetaData:
Expand All @@ -158,7 +168,7 @@ void MultiverseClient::run()
case EMultiverseClientState::ReceiveResponseMetaData:
receive_response_meta_data();

printf("[Client %s] Received meta data from the server:%*.*s\n", port.c_str(), STRING_SIZE, STRING_SIZE, response_meta_data_str.c_str());
// printf("[Client %s] Received meta data from the server:%*.*s\n", port.c_str(), STRING_SIZE, STRING_SIZE, response_meta_data_str.c_str());

if (should_shut_down)
{
Expand Down Expand Up @@ -209,19 +219,12 @@ void MultiverseClient::run()
case EMultiverseClientState::ReceiveData:
zmq_recv(client_socket, receive_buffer, receive_buffer_size * sizeof(double), 0);

if (should_shut_down)
{
flag = EMultiverseClientState::BindReceiveData;
break;
}

if (std::isnan(*receive_buffer) || *receive_buffer < 0)
if (!should_shut_down && (std::isnan(*receive_buffer) || *receive_buffer < 0))
{
printf("[Client %s] The socket %s from the server has been terminated, returning to resend the meta data.\n", port.c_str(), socket_addr.c_str());

wait_for_connect_to_server_thread_finish();
start_connect_to_server_thread();

return;
}
else
Expand Down Expand Up @@ -331,22 +334,21 @@ void MultiverseClient::communicate(const bool resend_request_meta_data)
}
else
{
if (flag == EMultiverseClientState::BindSendData || flag == EMultiverseClientState::InitSendAndReceiveData)
if (resend_request_meta_data)
{
if (resend_request_meta_data)
wait_for_meta_data_thread_finish();
if (flag == EMultiverseClientState::BindSendData)
{
if (flag == EMultiverseClientState::BindSendData)
{
init_objects();
}
else if (flag == EMultiverseClientState::InitSendAndReceiveData)
{
wait_for_meta_data_thread_finish();
}

clean_up();
flag = EMultiverseClientState::BindRequestMetaData;
init_objects();
}
clean_up();

flag = EMultiverseClientState::BindRequestMetaData;

run();
}
else if (flag == EMultiverseClientState::BindSendData || flag == EMultiverseClientState::InitSendAndReceiveData)
{
run();
}
}
Expand Down
Loading

0 comments on commit 255bb49

Please sign in to comment.