Skip to content

Commit

Permalink
Implement SocketUTransport for mock testing.
Browse files Browse the repository at this point in the history
This PR implements a socket transport to the python dispacther script,
and necessary data structures for listener callbacks and callback
removal. The high level code is a private implementation in
SocketUTransport.cpp. The unordered_map supporting callbacks is in
SafeTupleMap.h. The details of this is std::hash extensions to hash
tuples of optionals for filtering with wildcards.
  • Loading branch information
debruce committed Jul 16, 2024
1 parent 284e214 commit 6fc7268
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 520 deletions.
9 changes: 8 additions & 1 deletion up_client_socket/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ cmake_minimum_required(VERSION 3.20.1)
project(up_client_socket VERSION 0.1.0 LANGUAGES CXX DESCRIPTION "C++ socket transport")

find_package(up-cpp REQUIRED)
find_package(protobuf REQUIRED)
find_package(up-core-api REQUIRED)
find_package(spdlog REQUIRED)
add_definitions(-DSPDLOG_FMT_EXTERNAL)
find_package(fmt REQUIRED CONFIG)
Expand All @@ -39,6 +41,8 @@ target_include_directories(${PROJECT_NAME}
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}>
${rapidjson_INCLUDE_DIRS}
${up-cpp_INCLUDE_DIR}
${up-core-api_INCLUDE_DIR}
${protobuf_INCLUDE_DIR}
${spdlog_INCLUDE_DIR})

set_property(TARGET ${PROJECT_NAME} PROPERTY POSITION_INDEPENDENT_CODE ON)
Expand All @@ -50,9 +54,12 @@ target_link_libraries(${PROJECT_NAME}
dl
spdlog::spdlog
up-cpp::up-cpp
up-core-api::up-core-api
protobuf::libprotobuf
fmt::fmt
rapidjson)

# Specify the install location for the library
INSTALL(TARGETS ${PROJECT_NAME})
INSTALL(DIRECTORY include DESTINATION .)
INSTALL(DIRECTORY include DESTINATION .)

73 changes: 0 additions & 73 deletions up_client_socket/cpp/conanfile.py

This file was deleted.

5 changes: 3 additions & 2 deletions up_client_socket/cpp/conanfile.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[requires]
up-core-api/1.5.8
protobuf/3.21.12
up-cpp/0.1.2-dev
up-cpp/0.2.0
rapidjson/cci.20230929
spdlog/1.13.0
fmt/10.2.1
Expand All @@ -11,4 +12,4 @@ CMakeDeps
CMakeToolchain

[layout]
cmake_layout
cmake_layout
43 changes: 43 additions & 0 deletions up_client_socket/cpp/include/SafeTupleMap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>

#include "TupleOfOptionals.h"

template <typename KEY, typename VALUE>
class SafeTupleMap {
std::unordered_map<KEY, std::shared_ptr<VALUE>,
tuple_of_optionals::hash<KEY>>
map_;
std::mutex mtx;

public:
using Key = KEY;

SafeTupleMap() = default;

std::shared_ptr<VALUE> find(const KEY& key, bool create = false) {
std::unique_lock<std::mutex> lock(mtx);
auto it = map_.find(key);
if (!create) {
return (it != map_.end()) ? it->second : nullptr;
} else {
if (it != map_.end())
return it->second;
auto ptr = std::make_shared<VALUE>();
map_.emplace(key, ptr);
return ptr;
}
}

void erase(std::function<void(std::shared_ptr<VALUE>)> fn) {
std::unique_lock<std::mutex> lock(mtx);
for (auto [key, ptr] : map_) {
fn(ptr);
}
}
};
135 changes: 13 additions & 122 deletions up_client_socket/cpp/include/SocketUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,10 @@
#ifndef _SOCKET_UTRANSPORT_H_
#define _SOCKET_UTRANSPORT_H_

#include <unistd.h>
#include <up-core-api/umessage.pb.h>
#include <up-cpp/rpc/RpcClient.h>
#include <up-cpp/transport/UTransport.h>
#include <up-cpp/transport/builder/UAttributesBuilder.h>
#include <up-cpp/uri/builder/BuildUUri.h>
#include <up-cpp/utils/ThreadPool.h>
#include <up-cpp/uuid/factory/Uuidv8Factory.h>
#include <up-cpp/uuid/serializer/UuidSerializer.h>

#include <memory>
#include <string>

/// @class SocketUTransport
/// @brief Represents a socket-based implementation of the UTransport interface
Expand All @@ -29,127 +24,23 @@
/// The SocketUTransport class provides functionality for sending messages,
/// registering and unregistering listeners, and invoking remote methods over a
/// socket connection. It inherits from the UTransport and RpcClient classes.
class SocketUTransport : public uprotocol::utransport::UTransport,
public uprotocol::rpc::RpcClient {
class SocketUTransport : public uprotocol::transport::UTransport {
public:
/// @brief Constructs a SocketUTransport object.
SocketUTransport();

/// @brief Destroys the SocketUTransport object.
~SocketUTransport();

/// UTransport API's

/// @brief Sends a UMessage over the transport.
/// @param[in] transportUMessage The UMessage to send.
/// @return The status of the send operation.
uprotocol::v1::UStatus send(
const uprotocol::utransport::UMessage& transportUMessage) override;

/// @brief Registers a listener for a specific topic.
/// @param[in] topic The topic to register the listener for.
/// @param[in] listener The listener to register.
/// @return The status of the registration operation.
uprotocol::v1::UStatus registerListener(
const uprotocol::v1::UUri& topic,
const uprotocol::utransport::UListener& listener) override;

/// @brief Unregisters a listener for a specific topic.
/// @param[in] topic The topic to unregister the listener from.
/// @param[in] listener The listener to unregister.
/// @return The status of the unregistration operation.
uprotocol::v1::UStatus unregisterListener(
const uprotocol::v1::UUri& topic,
const uprotocol::utransport::UListener& listener) override;

/// @brief Invokes a remote method asynchronously and returns a future for
/// the response.
/// @param[in] topic The topic of the remote method.
/// @param[in] payload The payload of the remote method.
/// @param[in] options The call options for the remote method.
/// @return A future for the response of the remote method.
std::future<uprotocol::rpc::RpcResponse> invokeMethod(
const uprotocol::v1::UUri& topic,
const uprotocol::utransport::UPayload& payload,
const uprotocol::v1::CallOptions& options) override;

/// @brief Invokes a remote method asynchronously and registers a callback
/// for the response.
/// @param[in] topic The topic of the remote method.
/// @param[in] payload The payload of the remote method.
/// @param[in] options The call options for the remote method.
/// @param[in] callback The callback to be invoked when the response is
/// received.
/// @return The status of the invocation operation.
uprotocol::v1::UStatus invokeMethod(
const uprotocol::v1::UUri& topic,
const uprotocol::utransport::UPayload& payload,
const uprotocol::v1::CallOptions& options,
const uprotocol::utransport::UListener& callback) override;
SocketUTransport(const uprotocol::v1::UUri&);

private:
// The IP address of the dispatcher.
constexpr static const char* DISPATCHER_IP = "127.0.0.1";
// The port number of the dispatcher.
constexpr static const int DISPATCHER_PORT = 44444;
// The maximum length of a message in bytes.
constexpr static const int BYTES_MSG_LENGTH = 32767;

static const uprotocol::v1::UUri RESPONSE_URI; // The URI for responses.
std::thread processThread; // The thread for processing messages.
std::thread timeoutThread; // The thread for handling timeouts.
int socketFd; // The file descriptor for the socket.
std::mutex mutex_; // A mutex for thread synchronization.
std::mutex mutex_promise; // A mutex for synchronizing access to promises.

// A type alias for the key used in the uriToListener map.
using uuriKey = size_t;
// A type alias for the key used in the reqidToFutureUMessage map.
using uuidStr = std::string;

// A map from URIs to listeners. Each URI can have multiple listeners.
std::unordered_map<uuriKey,
std::vector<const uprotocol::utransport::UListener*>>
uriToListener;

// A map from request IDs to futures. Each request ID corresponds to a
// future for a UMessage.
std::unordered_map<uuidStr, std::promise<uprotocol::rpc::RpcResponse>>
reqidToFutureUMessage;

/// @brief Listens for incoming messages on the socket.
void listen();

/// @brief Handles a publish message received on the socket.
/// @param[in] umsg The UMessage representing the publish message.
void handlePublishMessage(const uprotocol::v1::UMessage umsg);

/// @brief Handles a request message received on the socket.
/// @param[in] umsg The UMessage representing the request message.
void handleRequestMessage(const uprotocol::v1::UMessage umsg);
[[nodiscard]] uprotocol::v1::UStatus sendImpl(
const uprotocol::v1::UMessage& message) override;

/// @brief Handles a response message received on the socket.
/// @param[in] umsg The UMessage representing the response message.
void handleResponseMessage(const uprotocol::v1::UMessage umsg);
[[nodiscard]] uprotocol::v1::UStatus registerListenerImpl(
const uprotocol::v1::UUri& sink_filter, CallableConn&& listener,
std::optional<uprotocol::v1::UUri>&& source_filter) override;

/// @brief Notifies the registered listeners for a specific URI about a
/// received message.
/// @param[in] uri The URI of the received message.
/// @param[in] umsg The UMessage representing the received message.
void notifyListeners(const uprotocol::v1::UUri uri,
const uprotocol::v1::UMessage umsg);
void cleanupListener(CallableConn listener) override;

/// @brief Counts the timeout for a request and handles the future and
/// promise accordingly.
/// @param[in] req_id The UUID of the request.
/// @param[in] resFuture The future for the response.
/// @param[in,out] promise The promise for the response.
/// @param[in] timeout The timeout value in milliseconds.
void timeout_counter(
const uprotocol::uuid::UUID& req_id,
const std::future<uprotocol::rpc::RpcResponse>& resFuture,
std::promise<uprotocol::rpc::RpcResponse>& promise,
const std::chrono::milliseconds timeout);
struct Impl;
std::shared_ptr<Impl> pImpl;
};

#endif // _SOCKET_UTRANSPORT_H_
Loading

0 comments on commit 6fc7268

Please sign in to comment.