Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add overloaded Request for non-blocking calls using abstract types #386

Open
wants to merge 3 commits into
base: gz-transport12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/gz/transport/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,27 @@ namespace gz
std::function<void(const ReplyT &_reply,
const bool _result)> &_callback);

/// \brief Request a new service using a non-blocking call.
/// In this version the callback is a lambda function and the
/// request and response may be abstract types.
/// \param[in] _topic Service name requested.
/// \param[in] _request Protobuf message containing the request's
/// parameters.
/// \param[in] _callback Lambda function executed when the response
/// arrives. The callback has the following parameters:
/// * _reply Protobuf message containing the response.
/// * _result Result of the service call. If false, there was
/// a problem executing your request.
/// \param[in] _repType Message type used in the response.
/// \return true when the service call was succesfully requested.
public: template<typename RequestT, typename ReplyT>
bool Request(
const std::string &_topic,
const RequestT &_request,
std::function<void(const ReplyT &_reply,
const bool _result)> &_callback,
const char *_repType);

/// \brief Request a new service without input parameter using a
/// non-blocking call.
/// In this version the callback is a lambda function.
Expand Down
68 changes: 66 additions & 2 deletions include/gz/transport/ReqHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <memory>
#include <string>

#include <gz/msgs/Factory.hh>

#include "gz/transport/config.hh"
#include "gz/transport/Export.hh"
#include "gz/transport/TransportTypes.hh"
Expand Down Expand Up @@ -318,6 +320,43 @@
{
}

/// \brief Create a specific protobuf message given its serialized data.
/// \param[in] _data The serialized data.
/// \return Pointer to the specific protobuf message.
public: std::shared_ptr<google::protobuf::Message>
CreateMsg(const std::string &_data) const

Check warning on line 327 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L327

Added line #L327 was not covered by tests
{
// Instantiate a specific protobuf message
std::shared_ptr<google::protobuf::Message> msgPtr =
gz::msgs::Factory::New(this->RepTypeName());
if (!msgPtr)

Check warning on line 332 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L331-L332

Added lines #L331 - L332 were not covered by tests
{
std::cerr << "Unable to create response of type["
<< this->RepTypeName() << "].\n";
return nullptr;

Check warning on line 336 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L335-L336

Added lines #L335 - L336 were not covered by tests
}

// Create the message using some serialized data
if (!msgPtr->ParseFromString(_data))

Check warning on line 340 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L340

Added line #L340 was not covered by tests
{
std::cerr << "ReqHandler::CreateMsg() error: ParseFromString failed"
<< std::endl;

Check warning on line 343 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L342-L343

Added lines #L342 - L343 were not covered by tests
}

return msgPtr;

Check warning on line 346 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L346

Added line #L346 was not covered by tests
}

/// \brief Set the callback for this handler.
/// \param[in] _cb The callback with the following parameters:
/// * _rep Protobuf message containing the service response.
/// * _result True when the service request was successful or
/// false otherwise.
public: void SetCallback(const std::function <void(
const google::protobuf::Message &_rep, const bool _result)> &_cb)
{
this->cb = _cb;
}

/// \brief Set the REQ protobuf message for this handler.
/// \param[in] _reqMsg Protofub message containing the input parameters of
/// of the service request.
Expand Down Expand Up @@ -371,8 +410,25 @@
// Documentation inherited.
public: void NotifyResult(const std::string &_rep, const bool _result)
{
this->rep = _rep;
this->result = _result;
// Execute the callback (if existing).
if (this->cb)
{
// Instantiate the specific protobuf message associated to this topic.
auto msg = this->CreateMsg(_rep);
if (!msg)

Check warning on line 418 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L417-L418

Added lines #L417 - L418 were not covered by tests
{
/// \todo(srmainwaring) verify this is the correct fail behaviour
this->result = false;
this->repAvailable = false;
this->condition.notify_one();

Check warning on line 423 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L421-L423

Added lines #L421 - L423 were not covered by tests
}
this->cb(*msg, _result);

Check warning on line 425 in include/gz/transport/ReqHandler.hh

View check run for this annotation

Codecov / codecov/patch

include/gz/transport/ReqHandler.hh#L425

Added line #L425 was not covered by tests
}
else
{
this->rep = _rep;
this->result = _result;
}

this->repAvailable = true;
this->condition.notify_one();
Expand Down Expand Up @@ -409,6 +465,14 @@

/// \brief Protobuf message containing the response.
private: google::protobuf::Message *repMsg = nullptr;

/// \brief Callback to the function registered for this handler with the
/// following parameters:
/// \param[in] _rep Protobuf message containing the service response.
/// \param[in] _result True when the service request was successful or
/// false otherwise.
private: std::function<void(
const google::protobuf::Message &_rep, const bool _result)> cb;
};
}
}
Expand Down
93 changes: 93 additions & 0 deletions include/gz/transport/detail/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,99 @@ namespace gz
return true;
}

//////////////////////////////////////////////////
template<typename RequestT, typename ReplyT>
bool Node::Request(
const std::string &_topic,
const RequestT &_request,
std::function<void(const ReplyT &_reply, const bool _result)> &_cb,
const char *_repType)
{
auto rep = gz::msgs::Factory::New(_repType);
if (!rep)
{
std::cerr << "Unable to create response of type["
<< _repType << "].\n";
return false;
}

// Topic remapping.
std::string topic = _topic;
this->Options().TopicRemap(_topic, topic);

std::string fullyQualifiedTopic;
if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
this->Options().NameSpace(), topic, fullyQualifiedTopic))
{
std::cerr << "Service [" << topic << "] is not valid." << std::endl;
return false;
}

bool localResponserFound;
IRepHandlerPtr repHandler;
{
std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
localResponserFound = this->Shared()->repliers.FirstHandler(
fullyQualifiedTopic,
_request.GetTypeName(),
rep->GetTypeName(),
repHandler);
}

// If the responser is within my process.
if (localResponserFound)
{
// There is a responser in my process, let's use it.
bool result = repHandler->RunLocalCallback(_request, *rep);

_cb(*rep, result);
return true;
}

// Create a new request handler.
std::shared_ptr<ReqHandler<RequestT, ReplyT>> reqHandlerPtr(
new ReqHandler<RequestT, ReplyT>(this->NodeUuid()));

// Insert the request's parameters.
reqHandlerPtr->SetMessage(&_request);

// Set the response message (to set the type info).
reqHandlerPtr->SetResponse(rep.get());

// Insert the callback into the handler.
reqHandlerPtr->SetCallback(_cb);

{
std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);

// Store the request handler.
this->Shared()->requests.AddHandler(
fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);

// If the responser's address is known, make the request.
SrvAddresses_M addresses;
if (this->Shared()->TopicPublishers(fullyQualifiedTopic, addresses))
{
this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
_request.GetTypeName(), rep->GetTypeName());
}
else
{
// Discover the service responser.
if (!this->Shared()->DiscoverService(fullyQualifiedTopic))
{
std::cerr << "Node::Request(): Error discovering service ["
<< topic
<< "]. Did you forget to start the discovery service?"
<< std::endl;
return false;
}
}
}

return true;
}

//////////////////////////////////////////////////
template<typename ReplyT>
bool Node::Request(
Expand Down