Skip to content

Commit

Permalink
[PLAT-14073] DB scoped failover+repair
Browse files Browse the repository at this point in the history
Summary: Enable DB scoped failover + repair.

Test Plan:
Set up DB scoped DR between A and B.
Do a failover, then repair back to A.
Do another failover, then repair back to B.

Log in to both universes, check replication status
```
Outbound Replication Group: baef4b9e-99dc-4fea-ab12-98b0b5c500a4_--DR-CONFIG-dr-config-kind-magenta-lobster-0

Namespace ID: 00004104000030008000000000000000
Namespace name: yugabyte
Table Id		Stream Id
00004104000030008000000000004003		c45e187412d94ca0ad44f308c0d879ff
00004104000030008000000000004000		f581e6143bdde293c04c0f83ad75dfcc
```
Use ysqlsh to check that table contents are the same on both source and target after repair.

This diff also has fixes for DB scoped switchover that broke with the changes to support DB scoped bootstrapping, so repeat test plan https://phorge.dev.yugabyte.com/D35907 too.

Reviewers: cwang, hzare, sanketh, spothuraju

Reviewed By: cwang

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D36537
  • Loading branch information
artem-mindrov committed Jul 22, 2024
1 parent 50422f8 commit 7c55b95
Show file tree
Hide file tree
Showing 10 changed files with 615 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.yugabyte.yw.forms.DrConfigCreateForm;
import com.yugabyte.yw.forms.RestoreBackupParams;
import com.yugabyte.yw.forms.XClusterConfigCreateFormData;
import com.yugabyte.yw.forms.XClusterConfigCreateFormData.BootstrapParams.BootstrapBackupParams;
import com.yugabyte.yw.models.Backup;
import com.yugabyte.yw.models.Customer;
import com.yugabyte.yw.models.PitrConfig;
Expand Down Expand Up @@ -86,10 +87,7 @@ protected boolean checkBootstrapRequired(
}

Duration xclusterWaitTimeout =
this.confGetter.getConfForScope(sourceUniverse, UniverseConfKeys.xclusterSetupAlterTimeout);
long sleepTimeMs;
int iterationNum = 0;
Duration currentElapsedTime;
confGetter.getConfForScope(sourceUniverse, UniverseConfKeys.xclusterSetupAlterTimeout);

try (YBClient client =
ybService.getClient(
Expand Down Expand Up @@ -333,6 +331,15 @@ protected void addSubtasksToCreateXClusterConfig(
}
}

protected void addSubtasksToCreateXClusterConfig(
XClusterConfig xClusterConfig,
Set<String> sourceDbIds,
@Nullable DrConfigCreateForm.PitrParams pitrParams,
boolean isForceBootstrap) {
addSubtasksToCreateXClusterConfig(
xClusterConfig, null, null, null, sourceDbIds, pitrParams, isForceBootstrap);
}

protected void addSubtasksToCreateXClusterConfig(
XClusterConfig xClusterConfig,
Set<String> sourceDbIds,
Expand Down Expand Up @@ -395,9 +402,8 @@ protected void addSubtasksForTablesNotNeedBootstrap(
}

// Set up PITRs for txn/db scoped xCluster.
if (xClusterConfig.getType() != (ConfigType.Basic)) {
if (xClusterConfig.getType() != ConfigType.Basic) {
Set<MasterTypes.NamespaceIdentifierPB> namespaces;

if (xClusterConfig.getType() == ConfigType.Db) {
namespaces = getNamespaces(sourceUniverse, sourceDbIds);
if (namespaces.size() != sourceDbIds.size()) {
Expand Down Expand Up @@ -533,7 +539,12 @@ protected void addSubtasksForTablesNeedBootstrap(
// For db scoped replication, we will skip backup/restore subtasks at runtime if bootstrapping
// is not required. For non-db scoped replication, we always perform backup restore here.
Predicate<ITask> bootstrapRequiredPredicate =
(task) -> {
task -> {
if (xClusterConfig.isUsedForDr()
&& xClusterConfig.getDrConfig().getFailoverXClusterConfig() != null) {
return false;
}

if (xClusterConfig.getType() == ConfigType.Db) {
return checkBootstrapRequired(namespaceId, ybService, sourceUniverse, xClusterConfig);
}
Expand Down Expand Up @@ -598,13 +609,21 @@ protected void addSubtasksForTablesNeedBootstrap(
sourceUniverse.isYbcEnabled()
&& targetUniverse.isYbcEnabled()
&& confGetter.getGlobalConf(GlobalConfKeys.enableYbcForXCluster);

BootstrapBackupParams backupParams = null;
if (bootstrapParams == null) {
if (xClusterConfig.isUsedForDr()) {
backupParams = new BootstrapBackupParams();
backupParams.storageConfigUUID = xClusterConfig.getDrConfig().getStorageConfigUuid();
backupParams.parallelism = xClusterConfig.getDrConfig().getParallelism();
}
} else {
backupParams = bootstrapParams.backupRequestParams;
}

BackupRequestParams backupRequestParams =
getBackupRequestParams(
sourceUniverse,
bootstrapParams,
tablesInfoListNeedBootstrap,
namespaceName,
tableType);
sourceUniverse, backupParams, tablesInfoListNeedBootstrap, namespaceName, tableType);
Backup backup =
createAllBackupSubtasks(
backupRequestParams,
Expand Down Expand Up @@ -667,7 +686,8 @@ protected void addSubtasksForTablesNeedBootstrap(
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.RestoringBackup);
} else if (tableType == CommonTypes.TableType.PGSQL_TABLE_TYPE) {
// Delete hanging replication streams, otherwise deleting the database will fail.
createDeleteRemnantStreamsTask(targetUniverse.getUniverseUUID(), namespaceName);
createDeleteRemnantStreamsTask(targetUniverse.getUniverseUUID(), namespaceName)
.setShouldRunPredicate(bootstrapRequiredPredicate);
// If the table type is YSQL, delete the database from the target universe before restore.
createDeleteKeySpaceTask(
namespaceName, CommonTypes.TableType.PGSQL_TABLE_TYPE, true /*ysqlForce*/)
Expand Down Expand Up @@ -848,18 +868,18 @@ protected void addSubtasksForTablesNeedBootstrap(
return dbToTablesInfoMapNeedBootstrap;
}

static BackupRequestParams getBackupRequestParams(
private static BackupRequestParams getBackupRequestParams(
Universe sourceUniverse,
XClusterConfigCreateFormData.BootstrapParams bootstrapParams,
BootstrapBackupParams backupParams,
@Nullable
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> tablesInfoListNeedBootstrap,
String namespaceName,
CommonTypes.TableType tableType) {
BackupRequestParams backupRequestParams;
if (bootstrapParams != null && bootstrapParams.backupRequestParams != null) {
if (backupParams != null) {
backupRequestParams = new BackupRequestParams();
backupRequestParams.storageConfigUUID = bootstrapParams.backupRequestParams.storageConfigUUID;
backupRequestParams.parallelism = bootstrapParams.backupRequestParams.parallelism;
backupRequestParams.storageConfigUUID = backupParams.storageConfigUUID;
backupRequestParams.parallelism = backupParams.parallelism;
} else {
// In case the user does not pass the backup parameters, use the default values.
backupRequestParams = new BackupRequestParams();
Expand All @@ -869,7 +889,7 @@ static BackupRequestParams getBackupRequestParams(
Backup.fetchLatestByState(backupRequestParams.customerUUID, Backup.BackupState.Completed);
if (latestCompletedBackupOptional.isEmpty()) {
throw new RuntimeException(
"bootstrapParams in XClusterConfigCreateFormData is null, and storageConfigUUID "
"backupParams in XClusterConfigCreateFormData is null, and storageConfigUUID "
+ "cannot be determined based on the latest successful backup");
}
backupRequestParams.storageConfigUUID =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,22 @@ protected void addSubtasksToUseNewXClusterConfig(

createXClusterConfigSetStatusTask(newXClusterConfig, XClusterConfigStatusType.Updating);

createXClusterConfigSetStatusForTablesTask(
newXClusterConfig,
getTableIds(taskParams().getTableInfoList()),
XClusterTableConfig.Status.Updating);

addSubtasksToCreateXClusterConfig(
newXClusterConfig,
taskParams().getTableInfoList(),
taskParams().getMainTableIndexTablesMap(),
taskParams().getSourceTableIdsWithNoTableOnTargetUniverse(),
taskParams().getPitrParams());
if (newXClusterConfig.getType() == XClusterConfig.ConfigType.Db) {
addSubtasksToCreateXClusterConfig(
newXClusterConfig, taskParams().getDbs(), taskParams().getPitrParams());
} else {
createXClusterConfigSetStatusForTablesTask(
newXClusterConfig,
getTableIds(taskParams().getTableInfoList()),
XClusterTableConfig.Status.Updating);

addSubtasksToCreateXClusterConfig(
newXClusterConfig,
taskParams().getTableInfoList(),
taskParams().getMainTableIndexTablesMap(),
taskParams().getSourceTableIdsWithNoTableOnTargetUniverse(),
taskParams().getPitrParams());
}

createXClusterConfigSetStatusTask(newXClusterConfig, XClusterConfigStatusType.Running)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.ConfigureUniverse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
import com.yugabyte.yw.models.Restore;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.XClusterConfig;
import com.yugabyte.yw.models.XClusterConfig.ConfigType;
import com.yugabyte.yw.models.XClusterConfig.XClusterConfigStatusType;
import com.yugabyte.yw.models.XClusterTableConfig;
import java.util.List;
import java.util.Set;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.yb.master.MasterDdlOuterClass;

@Slf4j
public class RestartXClusterConfig extends EditXClusterConfig {
Expand Down Expand Up @@ -44,32 +47,42 @@ public void run() {
createCheckXUniverseAutoFlag(sourceUniverse, targetUniverse)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.PreflightChecks);

// Set table type for old xCluster configs.
xClusterConfig.updateTableType(taskParams().getTableInfoList());
// TODO full DB scoped restart support
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> tableInfoList =
taskParams().getTableInfoList();
Set<String> tableIds = getTableIds(tableInfoList);

// Do not skip bootstrapping for the following tables. It will check if it is required.
if (taskParams().getBootstrapParams() != null) {
xClusterConfig.updateNeedBootstrapForTables(
taskParams().getBootstrapParams().tables, true /* needBootstrap */);
}
boolean isRestartWholeConfig;
if (xClusterConfig.getType() != ConfigType.Db) {
// Set table type for old xCluster configs.
xClusterConfig.updateTableType(tableInfoList);

createXClusterConfigSetStatusTask(
xClusterConfig, XClusterConfig.XClusterConfigStatusType.Updating)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.DeleteXClusterReplication);
// Do not skip bootstrapping for the following tables. It will check if it is required.
if (taskParams().getBootstrapParams() != null) {
xClusterConfig.updateNeedBootstrapForTables(
taskParams().getBootstrapParams().tables, true /* needBootstrap */);
}

Set<String> tableIds = getTableIds(taskParams().getTableInfoList());
createXClusterConfigSetStatusTask(
xClusterConfig, XClusterConfig.XClusterConfigStatusType.Updating)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.DeleteXClusterReplication);

// A replication group with no tables in it cannot exist in YBDB. If all the tables
// must be removed from the replication group, remove the replication group.
isRestartWholeConfig =
xClusterConfig.getTableIdsWithReplicationSetup(tableIds, true /* done */).size()
>= xClusterConfig.getTableIdsWithReplicationSetup().size();
} else {
isRestartWholeConfig =
taskParams().getDbs().size() == xClusterConfig.getNamespaces().size();
}

// A replication group with no tables in it cannot exist in YBDB. If all the tables must be
// removed from the replication group, remove the replication group.
boolean isRestartWholeConfig =
xClusterConfig.getTableIdsWithReplicationSetup(tableIds, true /* done */).size()
>= xClusterConfig.getTableIdsWithReplicationSetup().size();
log.info("isRestartWholeConfig is {}", isRestartWholeConfig);
if (isRestartWholeConfig) {
createXClusterConfigSetStatusForTablesTask(
xClusterConfig,
getTableIds(taskParams().getTableInfoList()),
XClusterTableConfig.Status.Updating);
if (xClusterConfig.getType() != ConfigType.Db) {
createXClusterConfigSetStatusForTablesTask(
xClusterConfig, getTableIds(tableInfoList), XClusterTableConfig.Status.Updating);
}

// Delete the replication group.
createDeleteXClusterConfigSubtasks(
Expand All @@ -91,19 +104,25 @@ public void run() {
createXClusterConfigSetStatusTask(
xClusterConfig, XClusterConfig.XClusterConfigStatusType.Updating);

createXClusterConfigSetStatusForTablesTask(
xClusterConfig,
getTableIds(taskParams().getTableInfoList()),
XClusterTableConfig.Status.Updating);

addSubtasksToCreateXClusterConfig(
xClusterConfig,
taskParams().getTableInfoList(),
taskParams().getMainTableIndexTablesMap(),
taskParams().getSourceTableIdsWithNoTableOnTargetUniverse(),
null,
taskParams().getPitrParams(),
taskParams().isForceBootstrap());
if (xClusterConfig.getType() == ConfigType.Db) {
addSubtasksToCreateXClusterConfig(
xClusterConfig,
taskParams().getDbs(),
taskParams().getPitrParams(),
taskParams().isForceBootstrap());
} else {
createXClusterConfigSetStatusForTablesTask(
xClusterConfig, getTableIds(tableInfoList), XClusterTableConfig.Status.Updating);

addSubtasksToCreateXClusterConfig(
xClusterConfig,
tableInfoList,
taskParams().getMainTableIndexTablesMap(),
taskParams().getSourceTableIdsWithNoTableOnTargetUniverse(),
null,
taskParams().getPitrParams(),
taskParams().isForceBootstrap());
}
} else {
createXClusterConfigSetStatusForTablesTask(
xClusterConfig, tableIds, XClusterTableConfig.Status.Updating);
Expand All @@ -115,10 +134,7 @@ public void run() {
xClusterConfig, tableIds, XClusterTableConfig.Status.Updating);

addSubtasksToAddTablesToXClusterConfig(
xClusterConfig,
taskParams().getTableInfoList(),
taskParams().getMainTableIndexTablesMap(),
tableIds);
xClusterConfig, tableInfoList, taskParams().getMainTableIndexTablesMap(), tableIds);
}

createXClusterConfigSetStatusTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5485,9 +5485,12 @@ protected void createDeleteXClusterConfigSubtasks(
createDeleteReplicationTask(xClusterConfig, forceDelete)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.DeleteXClusterReplication);
if (xClusterConfig.getType() == ConfigType.Db) {
// TODO: add forceDelete.
createDeleteReplicationOnSourceTask(xClusterConfig)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.DeleteXClusterReplication);
// If it's in the middle of a repair, there's no replication on source.
if (!(xClusterConfig.isUsedForDr() && xClusterConfig.getDrConfig().isHalted())) {
// TODO: add forceDelete.
createDeleteReplicationOnSourceTask(xClusterConfig)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.DeleteXClusterReplication);
}
} else {
// Delete bootstrap IDs created by bootstrap universe subtask.
createDeleteBootstrapIdsTask(xClusterConfig, xClusterConfig.getTableIds(), forceDelete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2043,6 +2043,32 @@ public static Map<String, String> getIndexTableIdToParentTableIdMap(
tableInfo -> getTableId(tableInfo), tableInfo -> tableInfo.getIndexedTableId()));
}

public static List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> getRequestedTableInfoList(
Set<String> dbIds,
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> sourceTableInfoList) {
List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> requestedTableInfoList =
sourceTableInfoList.stream()
.filter(
tableInfo ->
isXClusterSupported(tableInfo)
&& dbIds.contains(tableInfo.getNamespace().getId().toStringUtf8()))
.collect(Collectors.toList());
Set<String> foundDbIds =
requestedTableInfoList.stream()
.map(tableInfo -> tableInfo.getNamespace().getId().toStringUtf8())
.collect(Collectors.toSet());
// Ensure all DB names are found.
if (foundDbIds.size() != dbIds.size()) {
Set<String> missingDbIds =
dbIds.stream().filter(dbId -> !foundDbIds.contains(dbId)).collect(Collectors.toSet());
throw new IllegalArgumentException(
String.format(
"Some of the DB ids were not found: was %d, found %d, missing dbs: %s",
dbIds.size(), foundDbIds.size(), missingDbIds));
}
return requestedTableInfoList;
}

// DR methods.
// --------------------------------------------------------------------------------
protected DrConfig getDrConfigFromTaskParams() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void run() {
if (!Set.of(ConfigType.Txn, ConfigType.Db).contains(xClusterConfig.getType())) {
throw new IllegalArgumentException(
String.format(
"WaitForReplicationDrain only works for Txn xCluster; the current type is %s",
"WaitForReplicationDrain only works for Txn or DB scoped xCluster; the current type"
+ " is %s",
xClusterConfig.getType()));
}

Expand Down Expand Up @@ -91,6 +92,8 @@ public void run() {
universe.getUniverseUUID(), xClusterConfig, rgInfo.errorMessage()));
}

log.debug("Got namespace infos: {}", rgInfo.getNamespaceInfos());

rgInfo.getNamespaceInfos().stream()
.filter(i -> xClusterConfig.getDbIds().contains(i.getNamespaceId()))
.forEach(i -> activeStreamIds.addAll(i.getTableStreamsMap().values()));
Expand Down
Loading

0 comments on commit 7c55b95

Please sign in to comment.