Skip to content

Commit

Permalink
Remove duplicate methods in Connect operators (#10604)
Browse files Browse the repository at this point in the history
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
  • Loading branch information
katheris committed Sep 20, 2024
1 parent f7c1928 commit 0c7601a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ServiceAccount;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.client.CustomResource;
Expand All @@ -26,6 +27,7 @@
import io.strimzi.api.kafka.model.connector.ListOffsets;
import io.strimzi.api.kafka.model.kafka.Status;
import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.PlatformFeaturesAvailability;
import io.strimzi.operator.cluster.model.ConfigMapUtils;
Expand Down Expand Up @@ -74,6 +76,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -173,6 +176,30 @@ public ConnectOperatorMetricsHolder metrics() {
}
}

/**
* Gets the Deployment and StrimziPodSet for the Connect cluster
*
* @param reconciliation The reconciliation
* @param connect KafkaConnectCluster object
* @param deploymentReference Deployment reference
* @param podSetReference StrimziPodSet reference
*
* @return Future for tracking the asynchronous result of getting the resources
*/
protected Future<Void> controllerResources(Reconciliation reconciliation,
KafkaConnectCluster connect,
AtomicReference<Deployment> deploymentReference,
AtomicReference<StrimziPodSet> podSetReference) {
return Future
.join(deploymentOperations.getAsync(reconciliation.namespace(), connect.getComponentName()), podSetOperations.getAsync(reconciliation.namespace(), connect.getComponentName()))
.compose(res -> {
deploymentReference.set(res.resultAt(0));
podSetReference.set(res.resultAt(1));

return Future.succeededFuture();
});
}

/**
* Reconciles the ServiceAccount for the Connect cluster.
*
Expand Down Expand Up @@ -257,6 +284,30 @@ protected Future<ConfigMap> generateMetricsAndLoggingConfigMap(Reconciliation re
.compose(metricsAndLoggingCm -> Future.succeededFuture(kafkaConnectCluster.generateMetricsAndLogConfigMap(metricsAndLoggingCm)));
}

/**
* Reconciles the StrimziPodSet for the Connect cluster
*
* @param reconciliation The reconciliation
* @param connect KafkaConnectCluster object
* @param podAnnotations Pod annotations
* @param podSetAnnotations StrimziPodSet annotations
* @param customContainerImage Custom container image
*
* @return Future for tracking the asynchronous result of reconciling the StrimziPodSet
*/
protected Future<Void> reconcilePodSet(Reconciliation reconciliation,
KafkaConnectCluster connect,
Map<String, String> podAnnotations,
Map<String, String> podSetAnnotations,
String customContainerImage) {
return podSetOperations.reconcile(reconciliation, reconciliation.namespace(), connect.getComponentName(), connect.generatePodSet(connect.getReplicas(), podSetAnnotations, podAnnotations, pfa.isOpenshift(), imagePullPolicy, imagePullSecrets, customContainerImage))
.compose(reconciliationResult -> {
KafkaConnectRoller roller = new KafkaConnectRoller(reconciliation, connect, operationTimeoutMs, podOperations);
return roller.maybeRoll(PodSetUtils.podNames(reconciliationResult.resource()), pod -> KafkaConnectRoller.needsRollingRestart(reconciliationResult.resource(), pod));
})
.compose(i -> podSetOperations.readiness(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs));
}

/**
* Dynamically updates loggers in the Kafka Connect cluster.
*
Expand Down Expand Up @@ -414,7 +465,7 @@ private Future<List<Condition>> updateState(Reconciliation reconciliation, Strin
return Future.succeededFuture(conditions);
}
}
return future.compose(ignored -> Future.succeededFuture(conditions));
return future.map(ignored -> conditions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.strimzi.operator.cluster.model.KafkaConnectCluster;
import io.strimzi.operator.cluster.model.KafkaConnectorOffsetsAnnotation;
import io.strimzi.operator.cluster.model.NoSuchResourceException;
import io.strimzi.operator.cluster.model.PodSetUtils;
import io.strimzi.operator.cluster.operator.VertxUtil;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
import io.strimzi.operator.cluster.operator.resource.kubernetes.CrdOperator;
Expand Down Expand Up @@ -249,33 +248,6 @@ protected Future<KafkaConnectStatus> createOrUpdate(Reconciliation reconciliatio
return createOrUpdatePromise.future();
}

private Future<Void> controllerResources(Reconciliation reconciliation,
KafkaConnectCluster connect,
AtomicReference<Deployment> deploymentReference,
AtomicReference<StrimziPodSet> podSetReference) {
return Future
.join(deploymentOperations.getAsync(reconciliation.namespace(), connect.getComponentName()), podSetOperations.getAsync(reconciliation.namespace(), connect.getComponentName()))
.compose(res -> {
deploymentReference.set(res.resultAt(0));
podSetReference.set(res.resultAt(1));

return Future.succeededFuture();
});
}

private Future<Void> reconcilePodSet(Reconciliation reconciliation,
KafkaConnectCluster connect,
Map<String, String> podAnnotations,
Map<String, String> podSetAnnotations,
String customContainerImage) {
return podSetOperations.reconcile(reconciliation, reconciliation.namespace(), connect.getComponentName(), connect.generatePodSet(connect.getReplicas(), podSetAnnotations, podAnnotations, pfa.isOpenshift(), imagePullPolicy, imagePullSecrets, customContainerImage))
.compose(reconciliationResult -> {
KafkaConnectRoller roller = new KafkaConnectRoller(reconciliation, connect, operationTimeoutMs, podOperations);
return roller.maybeRoll(PodSetUtils.podNames(reconciliationResult.resource()), pod -> KafkaConnectRoller.needsRollingRestart(reconciliationResult.resource(), pod));
})
.compose(i -> podSetOperations.readiness(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs));
}

@Override
protected KafkaConnectStatus createStatus(KafkaConnect ignored) {
return new KafkaConnectStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.strimzi.operator.cluster.model.KafkaConnectCluster;
import io.strimzi.operator.cluster.model.KafkaConnectorOffsetsAnnotation;
import io.strimzi.operator.cluster.model.KafkaMirrorMaker2Cluster;
import io.strimzi.operator.cluster.model.PodSetUtils;
import io.strimzi.operator.cluster.operator.VertxUtil;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
import io.strimzi.operator.common.Annotations;
Expand Down Expand Up @@ -163,7 +162,7 @@ protected Future<KafkaMirrorMaker2Status> createOrUpdate(Reconciliation reconcil
podOperations
)
.migrateFromDeploymentToStrimziPodSets(deployment.get(), podSet.get()))
.compose(i -> reconcilePodSet(reconciliation, mirrorMaker2Cluster, podAnnotations))
.compose(i -> reconcilePodSet(reconciliation, mirrorMaker2Cluster, podAnnotations, null, null))
.compose(i -> hasZeroReplicas ? Future.succeededFuture() : reconcileConnectLoggers(reconciliation, KafkaMirrorMaker2Resources.qualifiedServiceName(reconciliation.name(), namespace), desiredLogging.get(), mirrorMaker2Cluster.defaultLogConfig()))
.compose(i -> hasZeroReplicas ? Future.succeededFuture() : reconcileConnectors(reconciliation, kafkaMirrorMaker2, mirrorMaker2Cluster, kafkaMirrorMaker2Status))
.map((Void) null)
Expand All @@ -190,31 +189,6 @@ protected Future<KafkaMirrorMaker2Status> createOrUpdate(Reconciliation reconcil
return createOrUpdatePromise.future();
}

private Future<Void> controllerResources(Reconciliation reconciliation,
KafkaMirrorMaker2Cluster mirrorMaker2Cluster,
AtomicReference<Deployment> deploymentReference,
AtomicReference<StrimziPodSet> podSetReference) {
return Future
.join(deploymentOperations.getAsync(reconciliation.namespace(), mirrorMaker2Cluster.getComponentName()), podSetOperations.getAsync(reconciliation.namespace(), mirrorMaker2Cluster.getComponentName()))
.compose(res -> {
deploymentReference.set(res.resultAt(0));
podSetReference.set(res.resultAt(1));

return Future.succeededFuture();
});
}

private Future<Void> reconcilePodSet(Reconciliation reconciliation,
KafkaConnectCluster connect,
Map<String, String> podAnnotations) {
return podSetOperations.reconcile(reconciliation, reconciliation.namespace(), connect.getComponentName(), connect.generatePodSet(connect.getReplicas(), null, podAnnotations, pfa.isOpenshift(), imagePullPolicy, imagePullSecrets, null))
.compose(reconciliationResult -> {
KafkaConnectRoller roller = new KafkaConnectRoller(reconciliation, connect, operationTimeoutMs, podOperations);
return roller.maybeRoll(PodSetUtils.podNames(reconciliationResult.resource()), pod -> KafkaConnectRoller.needsRollingRestart(reconciliationResult.resource(), pod));
})
.compose(i -> podSetOperations.readiness(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs));
}

@Override
protected KafkaMirrorMaker2Status createStatus(KafkaMirrorMaker2 ignored) {
return new KafkaMirrorMaker2Status();
Expand Down Expand Up @@ -269,7 +243,7 @@ private Future<Integer> generateAuthHash(String namespace, KafkaMirrorMaker2Spec
currentConnectors.removeAll(desiredConnectors.stream().map(c -> c.getMetadata().getName()).collect(Collectors.toSet()));

Future<Void> deletionFuture = deleteConnectors(reconciliation, host, apiClient, currentConnectors);
Future<Void> createOrUpdateFuture = reconcileConnectors(reconciliation, host, apiClient, kafkaMirrorMaker2, desiredConnectors, mirrorMaker2Status);
Future<Void> createOrUpdateFuture = createOrUpdateConnectors(reconciliation, host, apiClient, kafkaMirrorMaker2, desiredConnectors, mirrorMaker2Status);

return Future.join(deletionFuture, createOrUpdateFuture).map((Void) null);
});
Expand All @@ -285,11 +259,11 @@ private static Future<Void> deleteConnectors(Reconciliation reconciliation, Stri
.map((Void) null);
}

private Future<Void> reconcileConnectors(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, KafkaMirrorMaker2 mirrorMaker2, List<KafkaConnector> connectors, KafkaMirrorMaker2Status mirrorMaker2Status) {
private Future<Void> createOrUpdateConnectors(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, KafkaMirrorMaker2 mirrorMaker2, List<KafkaConnector> connectors, KafkaMirrorMaker2Status mirrorMaker2Status) {
return Future.join(connectors.stream()
.map(connector -> {
LOGGER.debugCr(reconciliation, "Creating / updating connector {}", connector.getMetadata().getName());
return reconcileMirrorMaker2Connector(reconciliation, mirrorMaker2, apiClient, host, connector.getMetadata().getName(), connector.getSpec(), mirrorMaker2Status);
return createOrUpdateMirrorMaker2Connector(reconciliation, mirrorMaker2, apiClient, host, connector.getMetadata().getName(), connector.getSpec(), mirrorMaker2Status);
})
.collect(Collectors.toList()))
.compose(i -> {
Expand All @@ -305,10 +279,10 @@ private Future<Void> reconcileConnectors(Reconciliation reconciliation, String h
return Future.succeededFuture();
}
})
.map((Void) null);
.mapEmpty();
}

private Future<Void> reconcileMirrorMaker2Connector(Reconciliation reconciliation, KafkaMirrorMaker2 mirrorMaker2, KafkaConnectApi apiClient, String host, String connectorName, KafkaConnectorSpec connectorSpec, KafkaMirrorMaker2Status mirrorMaker2Status) {
private Future<Void> createOrUpdateMirrorMaker2Connector(Reconciliation reconciliation, KafkaMirrorMaker2 mirrorMaker2, KafkaConnectApi apiClient, String host, String connectorName, KafkaConnectorSpec connectorSpec, KafkaMirrorMaker2Status mirrorMaker2Status) {
return maybeCreateOrUpdateConnector(reconciliation, host, apiClient, connectorName, connectorSpec, mirrorMaker2)
.onComplete(result -> {
if (result.succeeded()) {
Expand All @@ -324,7 +298,7 @@ private Future<Void> reconcileMirrorMaker2Connector(Reconciliation reconciliatio
} else {
maybeUpdateMirrorMaker2Status(reconciliation, mirrorMaker2, result.cause());
}
}).compose(ignored -> Future.succeededFuture());
}).mapEmpty();
}

private Future<Void> maybeUpdateMirrorMaker2Status(Reconciliation reconciliation, KafkaMirrorMaker2 mirrorMaker2, Throwable error) {
Expand Down

0 comments on commit 0c7601a

Please sign in to comment.