Skip to content

Commit

Permalink
Fix testUpgradeAcrossVersionsWithUnsupportedKafkaVersion test to run …
Browse files Browse the repository at this point in the history
…only when there is really an unsupported Kafka version across versions
  • Loading branch information
egyedt committed Sep 19, 2024
1 parent 8312d63 commit 5ef7836
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

import io.strimzi.systemtest.utils.TestKafkaVersion;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Class for representing Kafka version, with LMFV and IBPV for our upgrade/downgrade tests
* Represents "procedures" which should be done after upgrade of operator/before downgrade of operator
Expand All @@ -23,21 +27,21 @@ public UpgradeKafkaVersion(TestKafkaVersion testKafkaVersion) {

public UpgradeKafkaVersion(String version, String desiredMetadataVersion) {
this.version = version;
this.metadataVersion = desiredMetadataVersion;
metadataVersion = desiredMetadataVersion;
}

public UpgradeKafkaVersion(String version) {
String shortVersion = version;

if (version != null && !version.equals("")) {
if (version != null && !version.isEmpty()) {
String[] versionSplit = version.split("\\.");
shortVersion = String.format("%s.%s", versionSplit[0], versionSplit[1]);
}

this.version = version;
this.logMessageVersion = shortVersion;
this.interBrokerVersion = shortVersion;
this.metadataVersion = shortVersion;
logMessageVersion = shortVersion;
interBrokerVersion = shortVersion;
metadataVersion = shortVersion;
}

/**
Expand Down Expand Up @@ -71,19 +75,19 @@ public String getVersion() {
}

public String getLogMessageVersion() {
return this.logMessageVersion;
return logMessageVersion;
}

public String getInterBrokerVersion() {
return this.interBrokerVersion;
return interBrokerVersion;
}

public String getMetadataVersion() {
return this.metadataVersion;
return metadataVersion;
}

public static UpgradeKafkaVersion getKafkaWithVersionFromUrl(String kafkaVersionsUrl, String kafkaVersion) {
if (kafkaVersionsUrl.equals("HEAD")) {
if ("HEAD".equals(kafkaVersionsUrl)) {
return new UpgradeKafkaVersion(TestKafkaVersion.getSpecificVersion(kafkaVersion));
} else {
try {
Expand All @@ -96,4 +100,31 @@ public static UpgradeKafkaVersion getKafkaWithVersionFromUrl(String kafkaVersion
}
}
}

public static Optional<UpgradeKafkaVersion> getKafkaVersionSupportedBeforeUnsupportedAfterUpgrade(String fromKafkaVersionsUrl) {
List<TestKafkaVersion> supportedKafkaVersionsBeforeUpgrade = getSupportedKafkaVersions(fromKafkaVersionsUrl);
List<String> supportedKafkaVersionsAfterUpgrade = getSupportedKafkaVersions("HEAD")
.stream()
.map(TestKafkaVersion::version)
.collect(Collectors.toList());

return supportedKafkaVersionsBeforeUpgrade
.stream()
.filter(version -> !supportedKafkaVersionsAfterUpgrade.contains(version.version()))
.map(UpgradeKafkaVersion::new)
.findFirst();
}

private static List<TestKafkaVersion> getSupportedKafkaVersions(String kafkaVersionsUrl) {
if ("HEAD".equals(kafkaVersionsUrl)) {
return TestKafkaVersion.getSupportedKafkaVersions();
} else {
try {
List<TestKafkaVersion> kafkaVersions = TestKafkaVersion.parseKafkaVersionsFromUrl(kafkaVersionsUrl);
return TestKafkaVersion.getSupportedKafkaVersionsFromAllVersions(kafkaVersions);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TestKafkaVersion implements Comparable<TestKafkaVersion> {
static {
try {
kafkaVersions = parseKafkaVersions(TestUtils.USER_PATH + "/../kafka-versions.yaml");
supportedKafkaVersions = kafkaVersions.stream().filter(TestKafkaVersion::isSupported).collect(Collectors.toList());
supportedKafkaVersions = getSupportedKafkaVersionsFromAllVersions(kafkaVersions);
Collections.sort(kafkaVersions);
Collections.sort(supportedKafkaVersions);

Expand Down Expand Up @@ -181,6 +181,10 @@ public static List<TestKafkaVersion> getSupportedKafkaVersions() {
return supportedKafkaVersions;
}

public static List<TestKafkaVersion> getSupportedKafkaVersionsFromAllVersions(List<TestKafkaVersion> kafkaVersions) {
return kafkaVersions.stream().filter(TestKafkaVersion::isSupported).collect(Collectors.toList());
}

/**
* Parse the version information present in the {@code /kafka-versions} classpath resource and return a map
* of kafka versions data with a version as key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import static io.strimzi.systemtest.Environment.TEST_SUITE_NAMESPACE;
import static io.strimzi.systemtest.TestConstants.CO_NAMESPACE;
Expand All @@ -55,7 +56,7 @@ public class StrimziUpgradeST extends AbstractUpgradeST {
@ParameterizedTest(name = "from: {0} (using FG <{2}>) to: {1} (using FG <{3}>)")
@MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlUpgradeData")
void testUpgradeOfKafkaKafkaConnectAndKafkaConnector(String fromVersion, String toVersion, String fgBefore, String fgAfter, BundleVersionModificationData upgradeData) throws IOException {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(upgradeData.getOldestKafka());
// setting log message version to null, similarly to the examples, which are not configuring LMFV
upgradeKafkaVersion.setLogMessageVersion(null);
Expand All @@ -73,14 +74,14 @@ void testUpgradeKafkaWithoutVersion() throws IOException {
UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion());
upgradeKafkaVersion.setVersion(null);

final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

// Setup env
setupEnvAndUpgradeClusterOperator(CO_NAMESPACE, testStorage, acrossUpgradeData, upgradeKafkaVersion);

final Map<String, String> zooSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), controllerSelector);
final Map<String, String> kafkaSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), brokerSelector);
final Map<String, String> eoSnapshot = DeploymentUtils.depSnapshot(testStorage.getNamespaceName(), KafkaResources.entityOperatorDeploymentName(clusterName));
Map<String, String> zooSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), controllerSelector);
Map<String, String> kafkaSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), brokerSelector);
Map<String, String> eoSnapshot = DeploymentUtils.depSnapshot(testStorage.getNamespaceName(), KafkaResources.entityOperatorDeploymentName(clusterName));

// Make snapshots of all Pods
makeComponentsSnapshots(testStorage.getNamespaceName());
Expand Down Expand Up @@ -109,11 +110,12 @@ void testUpgradeKafkaWithoutVersion() throws IOException {

@IsolatedTest
void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion() throws IOException {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion());
TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
Optional<UpgradeKafkaVersion> upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaVersionSupportedBeforeUnsupportedAfterUpgrade(acrossUpgradeData.getFromKafkaVersionsUrl());
assumeTrue(upgradeKafkaVersion.isPresent(), "Supported Kafka versions after upgrade contains all supported Kafka versions before upgrade so test is skipped");

// Setup env
setupEnvAndUpgradeClusterOperator(CO_NAMESPACE, testStorage, acrossUpgradeData, upgradeKafkaVersion);
setupEnvAndUpgradeClusterOperator(CO_NAMESPACE, testStorage, acrossUpgradeData, upgradeKafkaVersion.get());

// Make snapshots of all Pods
makeComponentsSnapshots(TEST_SUITE_NAMESPACE);
Expand All @@ -136,7 +138,7 @@ void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion() throws IOException {

@IsolatedTest
void testUpgradeAcrossVersionsWithNoKafkaVersion() throws IOException {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
// Setup env
setupEnvAndUpgradeClusterOperator(CO_NAMESPACE, testStorage, acrossUpgradeData, null);

Expand Down

0 comments on commit 5ef7836

Please sign in to comment.