Skip to content

Commit

Permalink
[PLAT-14034][PLAT-14762] Support TLS for db scoped replication
Browse files Browse the repository at this point in the history
Summary:
Allow users to set up db scoped xCluster replication with TLS enabled on both source and target universes. Certificates will now be transferred bidirectionally. ie. Instead of only the source universe certificates being copied to the target nodes at by default: `{yb-home-dir}/yugabyte-tls-producer/{replication-group-name}/ca.crt`. We also copy the target universe's certificate to the same directory for the source universe's node.

Also, any new addition of nodes during add node task, edit universe actions, and certain rolling upgrades will be handled to ensure any new nodes or re-provisioned nodes will have the correct certificates if db scoped xcluster replication is configured.

Test Plan:
Added local provider test which creates 2 rf1 one node universes with TLS enabled. DB scoped replication is then set up and full move is performed on both the source and target universes. Assertions are made between these tasks to ensure that the replication is working successfully.

Manual testing:
Create any number of nodes, two rf3 universe with TLS enabled.
1. Create a bunch of tables on both universes:
```
-- Connect to colocated db
\c col_true_db

-- Create colocated tables.
CREATE TABLE COMPANY(
   ID INT PRIMARY KEY     NOT NULL,
   NAME           TEXT    NOT NULL
) with (COLOCATION_ID = 20000);

-- Create index in colocated table
create index on company(id) with (COLOCATION_ID = 20002);

-- Create table with colocation = false.
CREATE TABLE NOTCOLOCATEDTABLE(
   ID INT PRIMARY KEY     NOT NULL,
   NAME           TEXT    NOT NULL
) with (colocation=false);

create index on notcolocatedtable(id);

-- Connect to non-colocated db
\c non_colocated_db

-- Create normal tables.
CREATE TABLE house(
   ID INT PRIMARY KEY     NOT NULL,
   LOCATION           TEXT    NOT NULL
);

CREATE TABLE ROOM(
   ID INT PRIMARY KEY     NOT NULL,
   size           TEXT    NOT NULL
);

create index on house(id);
```

2. Create db scoped xcluster DR from the UI.
3. Perform switchover.
4. Perform failover + repair
5. Perform full move on both source + target universe.
6. Validate for each step from 2 - 5 that replication works as expected by performing insertions.
For example: `insert into company (id, name) values (1, 'yugabyte');`

Reviewers: hzare, jmak, sanketh, spothuraju, amindrov

Reviewed By: hzare

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D36870
  • Loading branch information
charleswang234 committed Aug 6, 2024
1 parent 927a0f9 commit ef62972
Show file tree
Hide file tree
Showing 18 changed files with 417 additions and 167 deletions.
32 changes: 16 additions & 16 deletions managed/devops/opscli/ybops/cloud/common/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,28 +533,28 @@ def copy_xcluster_root_cert(
connect_options,
root_cert_path,
replication_config_name,
producer_certs_dir):
xcluster_dest_certs_dir):
host_port_user = get_host_port_user(connect_options)
remote_shell = RemoteShell(connect_options)
node_ip = host_port_user["host"]
src_root_cert_dir_path = os.path.join(producer_certs_dir, replication_config_name)
src_root_cert_path = os.path.join(src_root_cert_dir_path, self.ROOT_CERT_NAME)
xcluster_root_cert_dir_path = os.path.join(xcluster_dest_certs_dir, replication_config_name)
xcluster_root_cert_path = os.path.join(xcluster_root_cert_dir_path, self.ROOT_CERT_NAME)
logging.info("Moving server cert located at {} to {}:{}.".format(
root_cert_path, node_ip, src_root_cert_dir_path))
root_cert_path, node_ip, xcluster_root_cert_dir_path))

remote_shell.run_command('mkdir -p ' + src_root_cert_dir_path)
remote_shell.run_command('mkdir -p ' + xcluster_root_cert_dir_path)
# Give write permissions. If the command fails, ignore.
remote_shell.run_command('chmod -f 666 {}/* || true'.format(src_root_cert_dir_path))
remote_shell.put_file(root_cert_path, src_root_cert_path)
remote_shell.run_command('chmod -f 666 {}/* || true'.format(xcluster_root_cert_dir_path))
remote_shell.put_file(root_cert_path, xcluster_root_cert_path)

# Reset the write permission as a sanity check.
remote_shell.run_command('chmod 400 {}/*'.format(src_root_cert_dir_path))
remote_shell.run_command('chmod 400 {}/*'.format(xcluster_root_cert_dir_path))

def remove_xcluster_root_cert(
self,
connect_options,
replication_config_name,
producer_certs_dir):
xcluster_dest_certs_dir):
def check_rm_result(rm_result):
if rm_result.exited and rm_result.stderr.find("No such file or directory") == -1:
raise YBOpsRuntimeError(
Expand All @@ -565,19 +565,19 @@ def check_rm_result(rm_result):
host_port_user = get_host_port_user(connect_options)
remote_shell = RemoteShell(connect_options)
node_ip = host_port_user["host"]
src_root_cert_dir_path = os.path.join(producer_certs_dir, replication_config_name)
src_root_cert_path = os.path.join(src_root_cert_dir_path, self.ROOT_CERT_NAME)
xcluster_root_cert_dir_path = os.path.join(xcluster_dest_certs_dir, replication_config_name)
xcluster_root_cert_path = os.path.join(xcluster_root_cert_dir_path, self.ROOT_CERT_NAME)
logging.info("Removing server cert located at {} from server {}.".format(
src_root_cert_dir_path, node_ip))
xcluster_root_cert_dir_path, node_ip))

remote_shell.run_command('chmod -f 666 {}/* || true'.format(src_root_cert_dir_path))
result = remote_shell.run_command_raw('rm ' + src_root_cert_path)
remote_shell.run_command('chmod -f 666 {}/* || true'.format(xcluster_root_cert_dir_path))
result = remote_shell.run_command_raw('rm ' + xcluster_root_cert_path)
check_rm_result(result)
# Remove the directory only if it is empty.
result = remote_shell.run_command_raw('rm -d ' + src_root_cert_dir_path)
result = remote_shell.run_command_raw('rm -d ' + xcluster_root_cert_dir_path)
check_rm_result(result)
# No need to check the result of this command.
remote_shell.run_command_raw('rm -d ' + producer_certs_dir)
remote_shell.run_command_raw('rm -d ' + xcluster_dest_certs_dir)

def copy_client_certs(
self,
Expand Down
8 changes: 4 additions & 4 deletions managed/devops/opscli/ybops/cloud/common/method.py
Original file line number Diff line number Diff line change
Expand Up @@ -1876,9 +1876,9 @@ def add_extra_args(self):
required=True,
help="The format of this name must be "
"[Source universe UUID]_[Config name].")
self.parser.add_argument("--producer_certs_dir",
self.parser.add_argument("--xcluster_dest_certs_dir",
required=True,
help="The directory containing the certs on the target universe.")
help="The directory containing the certs on destination universe.")
self.parser.add_argument("--action",
default="copy",
help="If true, the root certificate will be removed.")
Expand Down Expand Up @@ -1913,12 +1913,12 @@ def callback(self, args):
connect_options,
args.root_cert_path,
args.replication_config_name,
args.producer_certs_dir)
args.xcluster_dest_certs_dir)
elif args.action == "remove":
self.cloud.remove_xcluster_root_cert(
connect_options,
args.replication_config_name,
args.producer_certs_dir)
args.xcluster_dest_certs_dir)
else:
raise YBOpsRuntimeError("The action \"{}\" was not found: Must be either copy, "
"or remove".format(args.action))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,7 @@ protected void addSubtasksToCreateXClusterConfig(
}

// Support mismatched TLS root certificates.
Optional<File> sourceCertificate =
getSourceCertificateIfNecessary(sourceUniverse, targetUniverse);
sourceCertificate.ifPresent(
cert ->
createTransferXClusterCertsCopyTasks(
targetUniverse.getNodes(),
xClusterConfig.getReplicationGroupName(),
cert,
targetUniverse.getUniverseDetails().getSourceRootCertDirPath()));
createTransferXClusterCertsCopyTasks(xClusterConfig);

Map<String, List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo>>
dbToTablesInfoMapNeedBootstrap = null;
Expand Down Expand Up @@ -998,4 +990,36 @@ static RestoreBackupParams getRestoreBackupParams(

return restoreTaskParams;
}

public void createTransferXClusterCertsCopyTasks(XClusterConfig xClusterConfig) {
createTransferXClusterCertsCopyTasks(xClusterConfig, xClusterConfig.getReplicationGroupName());
}

/**
* Transfer source universe certs -> target universe. Also, target universe certs -> source
* universe if db scoped.
*
* @param xClusterConfig config with source and target universe to transfer certs.
* @param replicationGroupName name of the replication group for xClusterConfig.
*/
public void createTransferXClusterCertsCopyTasks(
XClusterConfig xClusterConfig, String replicationGroupName) {
Universe sourceUniverse = Universe.getOrBadRequest(xClusterConfig.getSourceUniverseUUID());
Universe targetUniverse = Universe.getOrBadRequest(xClusterConfig.getTargetUniverseUUID());
Optional<File> sourceCertificate =
getOriginCertficateIfNecessary(sourceUniverse, targetUniverse);

sourceCertificate.ifPresent(
cert ->
createTransferXClusterCertsCopyTasks(
targetUniverse.getNodes(), replicationGroupName, cert, targetUniverse));
if (xClusterConfig.getType() == ConfigType.Db) {
Optional<File> targetCertificate =
getOriginCertficateIfNecessary(targetUniverse, sourceUniverse);
targetCertificate.ifPresent(
cert ->
createTransferXClusterCertsCopyTasks(
sourceUniverse.getNodes(), replicationGroupName, cert, sourceUniverse));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
import com.yugabyte.yw.models.XClusterConfig.XClusterConfigStatusType;
import com.yugabyte.yw.models.XClusterNamespaceConfig;
import com.yugabyte.yw.models.XClusterTableConfig;
import java.io.File;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -71,29 +69,17 @@ public void run() {
String oldReplicationGroupName = xClusterConfig.getReplicationGroupName();
// If TLS root certificates are different, create a directory containing the source
// universe root certs with the new name.
Optional<File> sourceCertificate =
getSourceCertificateIfNecessary(sourceUniverse, targetUniverse);
sourceCertificate.ifPresent(
cert ->
createTransferXClusterCertsCopyTasks(
targetUniverse.getNodes(),
xClusterConfig.getNewReplicationGroupName(
xClusterConfig.getSourceUniverseUUID(), editFormData.name),
cert,
targetUniverse.getUniverseDetails().getSourceRootCertDirPath()));
createTransferXClusterCertsCopyTasks(
xClusterConfig,
xClusterConfig.getNewReplicationGroupName(
xClusterConfig.getSourceUniverseUUID(), editFormData.name));

createXClusterConfigRenameTask(xClusterConfig, editFormData.name)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.ConfigureUniverse);

// Delete the old directory if it created a new one. When the old directory is removed
// because of renaming, the directory for transactional replication must not be deleted.
sourceCertificate.ifPresent(
cert ->
createTransferXClusterCertsRemoveTasks(
xClusterConfig,
oldReplicationGroupName,
targetUniverse.getUniverseDetails().getSourceRootCertDirPath(),
false /* ignoreErrors */));
createTransferXClusterCertsRemoveTasks(xClusterConfig, oldReplicationGroupName);
} else if (editFormData.status != null) {
createSetReplicationPausedTask(xClusterConfig, editFormData.status)
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.ConfigureUniverse);
Expand Down
Loading

0 comments on commit ef62972

Please sign in to comment.