Skip to content

Commit

Permalink
[#9600] YSQL SMARTDRIVER: Incorrect host value being return in Kubern…
Browse files Browse the repository at this point in the history
…etes environment

Summary:
The value of the 'host' column in the result of //yb_servers// function in Kubernetes environment is incorrect when server_broadcast address is also set for tservers. The value in 'host' should be exactly same as what the 'preferred_ip' shows while querying "system.peers" ycql table. This value is correct when server broadcast address is not passed in the startup argument. But when that is passed then this starts showing the value of what was passed for server broadcast address instead.

Additionally, this change also addresses some code review comments which on git commit: 6df6555 as pointed by @d-uspenskiy

- Use iterator on the returned server list instead of using 'at' to avoid throwing exception
- Refactor the code to move the pgalloc function call out from PgSession to pggate layer to avoid tight coupling of pg layer and the ybase layer.
- Proper return value checks at a couple of places.
- This also includes a change from @d-uspenskiy to make compiler warn about return value not being handled from function returning 'Status'

Test Plan: Tested with the unit test TestLoadBalance, manual testing and demo sample apps in jdbc-yugabtedb repository.

Reviewers: dmitry

Reviewed By: dmitry

Subscribers: dmitry, mihnea, zyu, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D12494
  • Loading branch information
kneeraj committed Aug 13, 2021
1 parent f825627 commit c09d3c8
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 52 deletions.
8 changes: 3 additions & 5 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1361,10 +1361,9 @@ yb_servers(PG_FUNCTION_ARGS)
"public_ip", TEXTOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);

// Assuming not more than 1000 servers
YBCServerDescriptor **servers = (YBCServerDescriptor**)palloc0(1000 * sizeof(YBCServerDescriptor *));
YBCServerDescriptor *servers = NULL;
int numservers = 0;
YBCGetTabletServerHosts(servers, &numservers);
HandleYBStatus(YBCGetTabletServerHosts(&servers, &numservers));
funcctx->max_calls = numservers;
funcctx->user_fctx = servers;
MemoryContextSwitchTo(oldcontext);
Expand All @@ -1375,9 +1374,8 @@ yb_servers(PG_FUNCTION_ARGS)
Datum values[8];
bool nulls[8];
HeapTuple tuple;
YBCServerDescriptor** servers = (YBCServerDescriptor **)funcctx->user_fctx;
int cntr = funcctx->call_cntr;
YBCServerDescriptor *server = *(servers + cntr);
YBCServerDescriptor *server = (YBCServerDescriptor *)funcctx->user_fctx + cntr;
bool is_primary = server->isPrimary;
const char *node_type = is_primary ? "primary" : "read_replica";
// TODO: Remove hard coding of port and num_connections
Expand Down
12 changes: 8 additions & 4 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1490,8 +1490,7 @@ Status YBClient::ListTabletServers(vector<std::unique_ptr<YBTabletServer>>* tabl
return Status::OK();
}

Status YBClient::ListLiveTabletServers(
vector<std::unique_ptr<YBTabletServerPlacementInfo>>* tablet_servers, bool primary_only) {
Status YBClient::ListLiveTabletServers(TabletServersInfo* tablet_servers, bool primary_only) {
ListLiveTabletServersRequestPB req;
if (primary_only) req.set_primary_only(true);
ListLiveTabletServersResponsePB resp;
Expand All @@ -1504,6 +1503,12 @@ Status YBClient::ListLiveTabletServers(
std::string region = "";
std::string zone = "";
int broadcast_sz = entry.registration().common().broadcast_addresses().size();
int private_ip_addresses_sz = entry.registration().common().private_rpc_addresses().size();

const auto privateIp =
private_ip_addresses_sz > 0
? entry.registration().common().private_rpc_addresses().Get(0).host()
: DesiredHostPort(entry.registration().common(), data_->cloud_info_pb_).host();

std::string publicIp = "";
if (broadcast_sz > 0) {
Expand All @@ -1522,8 +1527,7 @@ Status YBClient::ListLiveTabletServers(
}

auto ts = std::make_unique<YBTabletServerPlacementInfo>(
entry.instance_id().permanent_uuid(),
DesiredHostPort(entry.registration().common(), data_->cloud_info_pb_).host(),
entry.instance_id().permanent_uuid(), privateIp,
entry.registration().common().placement_uuid(), cloud, region, zone, isPrimary,
publicIp, entry.registration().common().pg_port());
tablet_servers->push_back(std::move(ts));
Expand Down
7 changes: 4 additions & 3 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ class YBClientBuilder {
// This class is thread-safe.
class YBClient {
public:
using TabletServersInfo = std::vector<std::unique_ptr<yb::client::YBTabletServerPlacementInfo>>;

~YBClient();

std::unique_ptr<YBTableCreator> NewTableCreator();
Expand Down Expand Up @@ -562,9 +564,8 @@ class YBClient {

CHECKED_STATUS ListTabletServers(std::vector<std::unique_ptr<YBTabletServer>>* tablet_servers);

CHECKED_STATUS ListLiveTabletServers(
std::vector<std::unique_ptr<YBTabletServerPlacementInfo>>* tablet_servers,
bool primary_only = false);
CHECKED_STATUS ListLiveTabletServers(TabletServersInfo* tablet_servers,
bool primary_only = false);

// Sets local tserver and its proxy.
void SetLocalTabletServer(const std::string& ts_uuid,
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/tablet_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// under the License.
//

#include <string>
#ifndef YB_CLIENT_TABLET_SERVER_H
#define YB_CLIENT_TABLET_SERVER_H

Expand Down
6 changes: 3 additions & 3 deletions src/yb/util/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ class NODISCARD_CLASS Result {
return *new (this) Result(std::move(value));
}

explicit operator bool() const {
MUST_USE_RESULT explicit operator bool() const {
return ok();
}

bool operator!() const {
MUST_USE_RESULT bool operator!() const {
return !ok();
}

bool ok() const {
MUST_USE_RESULT bool ok() const {
#ifndef NDEBUG
ANNOTATE_IGNORE_WRITES_BEGIN();
success_checked_ = true;
Expand Down
2 changes: 1 addition & 1 deletion src/yb/util/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class STATUS_NODISCARD_CLASS Status {
Status() {}

// Returns true if the status indicates success.
bool ok() const { return state_ == nullptr; }
MUST_USE_RESULT bool ok() const { return state_ == nullptr; }

// Declares set of Is* functions
BOOST_PP_SEQ_FOR_EACH(YB_STATUS_FORWARD_MACRO, YB_STATUS_CODE_IS_FUNC, YB_STATUS_CODES)
Expand Down
31 changes: 3 additions & 28 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1228,35 +1228,10 @@ Status PgSession::TabletServerCount(int *tserver_count, bool primary_only, bool
return client_->TabletServerCount(tserver_count, primary_only, use_cache);
}

Status PgSession::ListTabletServers(YBCServerDescriptor **servers, int *numofservers) {
Result<client::YBClient::TabletServersInfo> PgSession::ListTabletServers() {
std::vector<std::unique_ptr<yb::client::YBTabletServerPlacementInfo>> tablet_servers;
Status ret = client_->ListLiveTabletServers(&tablet_servers, false);
*numofservers = tablet_servers.size();
int cnt = *numofservers;
if (cnt > 0) {
for (int i = 0; i < cnt; i++) {
std::string host = tablet_servers.at(i)->hostname();
std::string cloud = tablet_servers.at(i)->cloud();
std::string region = tablet_servers.at(i)->region();
std::string zone = tablet_servers.at(i)->zone();
std::string publicIp = tablet_servers.at(i)->publicIp();
bool isPrimary = tablet_servers.at(i)->isPrimary();
const char *hostC = YBCPAllocStdString(host);
const char *cloudC = YBCPAllocStdString(cloud);
const char *regionC = YBCPAllocStdString(region);
const char *zoneC = YBCPAllocStdString(zone);
const char *publicIpC = YBCPAllocStdString(publicIp);
servers[i] = reinterpret_cast<YBCServerDescriptor *>(YBCPAlloc(sizeof(YBCServerDescriptor)));
servers[i]->pgPort = tablet_servers[i]->pg_port();
servers[i]->host = hostC;
servers[i]->cloud = cloudC;
servers[i]->region = regionC;
servers[i]->zone = zoneC;
servers[i]->publicIp = publicIpC;
servers[i]->isPrimary = isPrimary;
}
}
return ret;
RETURN_NOT_OK(client_->ListLiveTabletServers(&tablet_servers, false));
return tablet_servers;
}

void PgSession::SetTimeout(const int timeout_ms) {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pggate/pg_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class PgSession : public RefCountedThreadSafe<PgSession> {

// Smart driver functions.
// -------------
CHECKED_STATUS ListTabletServers(YBCServerDescriptor **tablet_servers, int *numofservers);
Result<client::YBClient::TabletServersInfo> ListTabletServers();

//------------------------------------------------------------------------------------------------
// Access functions.
Expand Down
5 changes: 3 additions & 2 deletions src/yb/yql/pggate/pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <boost/optional.hpp>

#include "yb/client/tablet_server.h"
#include "yb/client/yb_table_name.h"

#include "yb/common/pg_system_attr.h"
Expand Down Expand Up @@ -1406,8 +1407,8 @@ void PgApiImpl::SetTimeout(const int timeout_ms) {
pg_session_->SetTimeout(timeout_ms);
}

void PgApiImpl::ListTabletServers(YBCServerDescriptor **tablet_servers, int *numofservers) {
pg_session_->ListTabletServers(tablet_servers, numofservers).ok();
Result<client::YBClient::TabletServersInfo> PgApiImpl::ListTabletServers() {
return pg_session_->ListTabletServers();
}

} // namespace pggate
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pggate/pggate.h
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ class PgApiImpl {
std::unique_ptr<rpc::Messenger> messenger;
};

void ListTabletServers(YBCServerDescriptor **tablet_servers, int *numofservers);
Result<client::YBClient::TabletServersInfo> ListTabletServers();

private:
// Control variables.
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pggate/ybc_pg_typedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ typedef struct PgServerDescriptor {
const char *cloud;
const char *region;
const char *zone;
bool isPrimary;
const char *publicIp;
bool isPrimary;
uint16_t pgPort;
} YBCServerDescriptor;

Expand Down
29 changes: 27 additions & 2 deletions src/yb/yql/pggate/ybc_pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#include <cds/init.h> // NOLINT

#include "yb/client/tablet_server.h"

#include "yb/common/common_flags.h"
#include "yb/common/ql_value.h"
#include "yb/common/ybc-internal.h"
Expand Down Expand Up @@ -1021,8 +1023,31 @@ void YBCSetTimeout(int timeout_ms, void* extra) {
pgapi->SetTimeout(timeout_ms);
}

void YBCGetTabletServerHosts(YBCServerDescriptor **servers, int * count) {
pgapi->ListTabletServers(servers, count);
YBCStatus YBCGetTabletServerHosts(YBCServerDescriptor **servers, int *count) {
const auto result = pgapi->ListTabletServers();
if (!result.ok()) {
return ToYBCStatus(result.status());
}
const auto &servers_info = result.get();
*count = servers_info.size();
*servers = NULL;
if (!servers_info.empty()) {
*servers = static_cast<YBCServerDescriptor *>(
YBCPAlloc(sizeof(YBCServerDescriptor) * servers_info.size()));
YBCServerDescriptor *dest = *servers;
for (const auto &info : servers_info) {
new (dest) YBCServerDescriptor{
.host = YBCPAllocStdString(info->hostname()),
.cloud = YBCPAllocStdString(info->cloud()),
.region = YBCPAllocStdString(info->region()),
.zone = YBCPAllocStdString(info->zone()),
.publicIp = YBCPAllocStdString(info->publicIp()),
.isPrimary = info->isPrimary(),
.pgPort = info->pg_port()};
++dest;
}
}
return YBCStatusOK();
}

//------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pggate/ybc_pggate.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ const void* YBCPgGetThreadLocalErrMsg();

void YBCPgResetCatalogReadTime();

void YBCGetTabletServerHosts(YBCServerDescriptor **tablet_servers, int* numservers);
YBCStatus YBCGetTabletServerHosts(YBCServerDescriptor **tablet_servers, int* numservers);

#ifdef __cplusplus
} // extern "C"
Expand Down

0 comments on commit c09d3c8

Please sign in to comment.