diff --git a/include/gz/transport/Node.hh b/include/gz/transport/Node.hh index b7d0272e4..e27d1c030 100644 --- a/include/gz/transport/Node.hh +++ b/include/gz/transport/Node.hh @@ -561,6 +561,27 @@ namespace gz std::function &_callback); + /// \brief Request a new service using a non-blocking call. + /// In this version the callback is a lambda function and the + /// request and response may be abstract types. + /// \param[in] _topic Service name requested. + /// \param[in] _request Protobuf message containing the request's + /// parameters. + /// \param[in] _callback Lambda function executed when the response + /// arrives. The callback has the following parameters: + /// * _reply Protobuf message containing the response. + /// * _result Result of the service call. If false, there was + /// a problem executing your request. + /// \param[in] _repType Message type used in the response. + /// \return true when the service call was succesfully requested. + public: template + bool Request( + const std::string &_topic, + const RequestT &_request, + std::function &_callback, + const char *_repType); + /// \brief Request a new service without input parameter using a /// non-blocking call. /// In this version the callback is a lambda function. diff --git a/include/gz/transport/ReqHandler.hh b/include/gz/transport/ReqHandler.hh index bce02a7e0..52bab1d5b 100644 --- a/include/gz/transport/ReqHandler.hh +++ b/include/gz/transport/ReqHandler.hh @@ -32,6 +32,8 @@ #include #include +#include + #include "gz/transport/config.hh" #include "gz/transport/Export.hh" #include "gz/transport/TransportTypes.hh" @@ -318,6 +320,43 @@ namespace gz { } + /// \brief Create a specific protobuf message given its serialized data. + /// \param[in] _data The serialized data. + /// \return Pointer to the specific protobuf message. + public: std::shared_ptr + CreateMsg(const std::string &_data) const + { + // Instantiate a specific protobuf message + std::shared_ptr msgPtr = + gz::msgs::Factory::New(this->RepTypeName()); + if (!msgPtr) + { + std::cerr << "Unable to create response of type[" + << this->RepTypeName() << "].\n"; + return nullptr; + } + + // Create the message using some serialized data + if (!msgPtr->ParseFromString(_data)) + { + std::cerr << "ReqHandler::CreateMsg() error: ParseFromString failed" + << std::endl; + } + + return msgPtr; + } + + /// \brief Set the callback for this handler. + /// \param[in] _cb The callback with the following parameters: + /// * _rep Protobuf message containing the service response. + /// * _result True when the service request was successful or + /// false otherwise. + public: void SetCallback(const std::function &_cb) + { + this->cb = _cb; + } + /// \brief Set the REQ protobuf message for this handler. /// \param[in] _reqMsg Protofub message containing the input parameters of /// of the service request. @@ -371,8 +410,25 @@ namespace gz // Documentation inherited. public: void NotifyResult(const std::string &_rep, const bool _result) { - this->rep = _rep; - this->result = _result; + // Execute the callback (if existing). + if (this->cb) + { + // Instantiate the specific protobuf message associated to this topic. + auto msg = this->CreateMsg(_rep); + if (!msg) + { + /// \todo(srmainwaring) verify this is the correct fail behaviour + this->result = false; + this->repAvailable = false; + this->condition.notify_one(); + } + this->cb(*msg, _result); + } + else + { + this->rep = _rep; + this->result = _result; + } this->repAvailable = true; this->condition.notify_one(); @@ -409,6 +465,14 @@ namespace gz /// \brief Protobuf message containing the response. private: google::protobuf::Message *repMsg = nullptr; + + /// \brief Callback to the function registered for this handler with the + /// following parameters: + /// \param[in] _rep Protobuf message containing the service response. + /// \param[in] _result True when the service request was successful or + /// false otherwise. + private: std::function cb; }; } } diff --git a/include/gz/transport/detail/Node.hh b/include/gz/transport/detail/Node.hh index d069ebdbe..cc0f90830 100644 --- a/include/gz/transport/detail/Node.hh +++ b/include/gz/transport/detail/Node.hh @@ -477,6 +477,99 @@ namespace gz return true; } + ////////////////////////////////////////////////// + template + bool Node::Request( + const std::string &_topic, + const RequestT &_request, + std::function &_cb, + const char *_repType) + { + auto rep = gz::msgs::Factory::New(_repType); + if (!rep) + { + std::cerr << "Unable to create response of type[" + << _repType << "].\n"; + return false; + } + + // Topic remapping. + std::string topic = _topic; + this->Options().TopicRemap(_topic, topic); + + std::string fullyQualifiedTopic; + if (!TopicUtils::FullyQualifiedName(this->Options().Partition(), + this->Options().NameSpace(), topic, fullyQualifiedTopic)) + { + std::cerr << "Service [" << topic << "] is not valid." << std::endl; + return false; + } + + bool localResponserFound; + IRepHandlerPtr repHandler; + { + std::lock_guard lk(this->Shared()->mutex); + localResponserFound = this->Shared()->repliers.FirstHandler( + fullyQualifiedTopic, + _request.GetTypeName(), + rep->GetTypeName(), + repHandler); + } + + // If the responser is within my process. + if (localResponserFound) + { + // There is a responser in my process, let's use it. + bool result = repHandler->RunLocalCallback(_request, *rep); + + _cb(*rep, result); + return true; + } + + // Create a new request handler. + std::shared_ptr> reqHandlerPtr( + new ReqHandler(this->NodeUuid())); + + // Insert the request's parameters. + reqHandlerPtr->SetMessage(&_request); + + // Set the response message (to set the type info). + reqHandlerPtr->SetResponse(rep.get()); + + // Insert the callback into the handler. + reqHandlerPtr->SetCallback(_cb); + + { + std::lock_guard lk(this->Shared()->mutex); + + // Store the request handler. + this->Shared()->requests.AddHandler( + fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr); + + // If the responser's address is known, make the request. + SrvAddresses_M addresses; + if (this->Shared()->TopicPublishers(fullyQualifiedTopic, addresses)) + { + this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic, + _request.GetTypeName(), rep->GetTypeName()); + } + else + { + // Discover the service responser. + if (!this->Shared()->DiscoverService(fullyQualifiedTopic)) + { + std::cerr << "Node::Request(): Error discovering service [" + << topic + << "]. Did you forget to start the discovery service?" + << std::endl; + return false; + } + } + } + + return true; + } + ////////////////////////////////////////////////// template bool Node::Request(