Skip to content

Commit

Permalink
[#9123] Platform: Added YBClient functionality for creating/altering/…
Browse files Browse the repository at this point in the history
…deleting/getting xCluster replication relationships

Summary:
- Added methods to AsyncYBClient and YBClient to create, alter, delete, get, and enable/disable xCluster replication relationships, as well as to bootstrap a universe
- Added request/response classes for the above functions

Test Plan: Tested by temporarily modifying endpoints created in D12227 to call the corresponding functions to ensure no errors occurred

Reviewers: daniel, rahuldesirazu

Reviewed By: daniel, rahuldesirazu

Differential Revision: https://phabricator.dev.yugabyte.com/D12299
  • Loading branch information
atarw committed Jul 30, 2021
1 parent 4c92727 commit 4ce4171
Show file tree
Hide file tree
Showing 14 changed files with 854 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.client;

import com.google.protobuf.Message;
import org.jboss.netty.buffer.ChannelBuffer;
import org.yb.Common;
import org.yb.master.Master;
import org.yb.util.Pair;

import java.util.List;
import java.util.UUID;

public class AlterXClusterReplicationRequest extends YRpc<AlterXClusterReplicationResponse> {

private final UUID sourceUniverseUUID;
private final List<String> sourceTableIDsToAdd;
private final List<String> sourceTableIDsToRemove;
private final List<Common.HostPortPB> sourceMasterAddresses;

AlterXClusterReplicationRequest(
YBTable table,
UUID sourceUniverseUUID,
List<String> sourceTableIDsToAdd,
List<String> sourceTableIDsToRemove,
List<Common.HostPortPB> sourceMasterAddresses) {
super(table);
this.sourceUniverseUUID = sourceUniverseUUID;
this.sourceTableIDsToAdd = sourceTableIDsToAdd;
this.sourceTableIDsToRemove = sourceTableIDsToRemove;
this.sourceMasterAddresses = sourceMasterAddresses;
}

@Override
ChannelBuffer serialize(Message header) {
assert header.isInitialized();

final Master.AlterUniverseReplicationRequestPB.Builder builder =
Master.AlterUniverseReplicationRequestPB.newBuilder()
.setProducerId(sourceUniverseUUID.toString())
.addAllProducerTableIdsToAdd(sourceTableIDsToAdd)
.addAllProducerTableIdsToRemove(sourceTableIDsToRemove)
.addAllProducerMasterAddresses(sourceMasterAddresses);

return toChannelBuffer(header, builder.build());
}

@Override
String serviceName() {
return MASTER_SERVICE_NAME;
}

@Override
String method() {
return "AlterXClusterReplication";
}

@Override
Pair<AlterXClusterReplicationResponse, Object> deserialize(
CallResponse callResponse, String tsUUID) throws Exception {
final Master.AlterUniverseReplicationResponsePB.Builder builder =
Master.AlterUniverseReplicationResponsePB.newBuilder();

readProtobuf(callResponse.getPBMessage(), builder);

final Master.MasterErrorPB error = builder.hasError() ? builder.getError() : null;

AlterXClusterReplicationResponse response =
new AlterXClusterReplicationResponse(deadlineTracker.getElapsedMillis(),
tsUUID, error);

return new Pair<>(response, error);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.yb.client;

import org.yb.annotations.InterfaceAudience;
import org.yb.master.Master;

@InterfaceAudience.Public
public class AlterXClusterReplicationResponse extends YRpcResponse {
private final Master.MasterErrorPB serverError;

AlterXClusterReplicationResponse(
long elapsedMillis, String tsUUID, Master.MasterErrorPB serverError) {
super(elapsedMillis, tsUUID);
this.serverError = serverError;
}

public boolean hasError() {
return serverError != null;
}

public String errorMessage() {
if (serverError == null) {
return "";
}

return serverError.getStatus().getMessage();
}
}
141 changes: 140 additions & 1 deletion java/yb-client/src/main/java/org/yb/client/AsyncYBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
//
package org.yb.client;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -74,6 +76,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -114,7 +117,6 @@
import org.yb.util.NetUtil;
import org.yb.util.Pair;
import org.yb.util.Slice;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* A fully asynchronous and thread-safe client for YB.
Expand Down Expand Up @@ -781,6 +783,143 @@ public Deferred<HasUniverseKeyInMemoryResponse> hasUniverseKeyInMemory(
return d;
}

/**
* Create xCluster replication relationships between the source universe and the target universe,
* and replicate the given tables
*
* Prerequisites: tables to be replicated must exist on target universe with same name and schema.
* AsyncYBClient must be created with target universe as the context.
*
* @param sourceUniverseUUID The source universe's UUID
* @param sourceTableIDs The tables in the source universe that should be replicated
* @param sourceMasterAddresses The master addresses of the source universe
* @param sourceBootstrapIDs The bootstrap IDs for the source universe
* (optional, can pass an empty list)
*
* @return a deferred object that yields a create xCluster replication response.
* */
public Deferred<CreateXClusterReplicationResponse> createXClusterReplication(
UUID sourceUniverseUUID,
List<String> sourceTableIDs,
List<Common.HostPortPB> sourceMasterAddresses,
List<String> sourceBootstrapIDs) {
checkIsClosed();
CreateXClusterReplicationRequest request =
new CreateXClusterReplicationRequest(
this.masterTable,
sourceUniverseUUID,
sourceTableIDs,
sourceMasterAddresses,
sourceBootstrapIDs);
request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
return sendRpcToTablet(request);
}

/**
* Alter existing xCluster replication relationships by modifying which tables to replicate from a
* source universe, as well as the master addresses of the source universe
*
* Prerequisites: AsyncYBClient must be created with target universe as the context.
*
* @param sourceUniverseUUID The source universe's UUID
* @param sourceTableIDsToAdd Table IDs in the source universe to start replicating from
* @param sourceTableIDsToRemove Table IDs in the source universe to stop replicating from
* @param sourceMasterAddresses New list of master addresses for the source universe
* (optional, can pass an empty list)
*
* @return a deferred object that yields an alter xCluster replication response.
* */
public Deferred<AlterXClusterReplicationResponse> alterXClusterReplication(
UUID sourceUniverseUUID,
List<String> sourceTableIDsToAdd,
List<String> sourceTableIDsToRemove,
List<Common.HostPortPB> sourceMasterAddresses) {
checkIsClosed();
AlterXClusterReplicationRequest request =
new AlterXClusterReplicationRequest(
this.masterTable,
sourceUniverseUUID,
sourceTableIDsToAdd,
sourceTableIDsToRemove,
sourceMasterAddresses);
request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
return sendRpcToTablet(request);
}

/**
* Delete existing xCluster replications from a source universe to our target universe
*
* Prerequisites: AsyncYBClient must be created with target universe as the context.
*
* @param sourceUniverseUUID The source universe's UUID
*
* @return a deferred object that yields a delete xCluster replication response.
* */
public Deferred<DeleteXClusterReplicationResponse> deleteXClusterReplication(
UUID sourceUniverseUUID) {
checkIsClosed();
DeleteXClusterReplicationRequest request =
new DeleteXClusterReplicationRequest(this.masterTable, sourceUniverseUUID);
request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
return sendRpcToTablet(request);
}

/**
* Gets all xCluster replication info between the source and target universe
* (tables being replicated, state of replication, etc).
*
* Prerequisites: AsyncYBClient must be created with target universe as the context.
*
* @param sourceUniverseUUID The source universe's UUID
*
* @return a deferred object that yields a get xCluster replication response.
* */
public Deferred<GetXClusterReplicationInfoResponse> getXClusterReplicationInfo(
UUID sourceUniverseUUID) {
checkIsClosed();
GetXClusterReplicationInfoRequest request =
new GetXClusterReplicationInfoRequest(this.masterTable, sourceUniverseUUID);
request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
return sendRpcToTablet(request);
}

/**
* Sets existing xCluster replication relationships between the source and target universes to be
* either active or inactive
*
* Prerequisites: AsyncYBClient must be created with target universe as the context.
*
* @param sourceUniverseUUID The source universe's UUID
* @param active Whether the replication should be enabled or not
*
* @return a deferred object that yields a set xCluster replication active response.
* */
public Deferred<SetXClusterReplicationActiveResponse> setXClusterReplicationActive(
UUID sourceUniverseUUID, boolean active) {
checkIsClosed();
SetXClusterReplicationActiveRequest request =
new SetXClusterReplicationActiveRequest(this.masterTable, sourceUniverseUUID, active);
request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
return sendRpcToTablet(request);
}

/**
* Creates a checkpoint of most recent op ids for all tablets of the given tables (otherwise known
* as bootstrapping)
*
* @param tableIDs List of table IDs to create checkpoints for
*
* @return a deferred object that yields a bootstrap universe response which contains a list of
* bootstrap IDs corresponding to the same order of table IDs.
* */
public Deferred<BootstrapUniverseResponse> bootstrapUniverse(List<String> tableIDs) {
checkIsClosed();
BootstrapUniverseRequest request =
new BootstrapUniverseRequest(this.masterTable, tableIDs);
request.setTimeoutMillis(defaultAdminOperationTimeoutMs);
return sendRpcToTablet(request);
}

/**
* Change Master Configuration request handler.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.client;

import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import org.jboss.netty.buffer.ChannelBuffer;
import org.yb.cdc.CdcService;
import org.yb.util.Pair;

import java.util.List;
import java.util.stream.Collectors;

public class BootstrapUniverseRequest extends YRpc<BootstrapUniverseResponse> {

private final List<String> tableIDs;

BootstrapUniverseRequest(YBTable table, List<String> tableIDs) {
super(table);
this.tableIDs = tableIDs;
}

@Override
ChannelBuffer serialize(Message header) {
assert header.isInitialized();

final CdcService.BootstrapProducerRequestPB.Builder builder =
CdcService.BootstrapProducerRequestPB.newBuilder()
.addAllTableIds(tableIDs);

return toChannelBuffer(header, builder.build());
}

@Override
String serviceName() {
return MASTER_SERVICE_NAME;
}

@Override
String method() {
return "BootstrapUniverse";
}

@Override
Pair<BootstrapUniverseResponse, Object> deserialize(
CallResponse callResponse, String tsUUID) throws Exception {
final CdcService.BootstrapProducerResponsePB.Builder builder =
CdcService.BootstrapProducerResponsePB.newBuilder();

readProtobuf(callResponse.getPBMessage(), builder);

final CdcService.CDCErrorPB error = builder.hasError() ? builder.getError() : null;
final List<String> bootstrapIDs = builder
.getCdcBootstrapIdsList()
.stream()
.map(ByteString::toString)
.collect(Collectors.toList());

BootstrapUniverseResponse response =
new BootstrapUniverseResponse(deadlineTracker.getElapsedMillis(),
tsUUID, error, bootstrapIDs);

return new Pair<>(response, error);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.yb.client;

import org.yb.annotations.InterfaceAudience;
import org.yb.cdc.CdcService;
import org.yb.master.Master;

import java.util.List;

@InterfaceAudience.Public
public class BootstrapUniverseResponse extends YRpcResponse {
private final CdcService.CDCErrorPB cdcError;
private final List<String> bootstrapIDs;

BootstrapUniverseResponse(
long elapsedMillis, String tsUUID, CdcService.CDCErrorPB cdcError, List<String> bootstrapIDs) {
super(elapsedMillis, tsUUID);
this.cdcError = cdcError;
this.bootstrapIDs = bootstrapIDs;
}

public boolean hasError() {
return cdcError != null;
}

public String errorMessage() {
if (cdcError == null) {
return "";
}

return cdcError.getStatus().getMessage();
}

public List<String> bootstrapIDs() {
return bootstrapIDs;
}
}
Loading

0 comments on commit 4ce4171

Please sign in to comment.