Skip to content

Commit

Permalink
[FLINK-32012] Take in account case where new spec submitted while rol…
Browse files Browse the repository at this point in the history
…ling back
  • Loading branch information
ashangit committed Jun 26, 2023
1 parent db347cb commit c22f02d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
Expand Down Expand Up @@ -141,12 +140,6 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
previousDeployment,
false);
}
// Rely on the last stable spec if rolling back
if (flinkApp.getStatus().getReconciliationStatus().getState()
== ReconciliationState.ROLLING_BACK) {
flinkApp.setSpec(
flinkApp.getStatus().getReconciliationStatus().deserializeLastStableSpec());
}
statusRecorder.patchAndCacheStatus(flinkApp);
reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);
} catch (RecoveryFailureException rfe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
Expand All @@ -39,7 +38,6 @@
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
Expand All @@ -58,6 +56,9 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.kubernetes.operator.api.spec.JobState.RUNNING;
import static org.apache.flink.kubernetes.operator.api.spec.JobState.SUSPENDED;

/**
* Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink
* related resources including initial deployments, upgrades, rollbacks etc.
Expand Down Expand Up @@ -137,8 +138,27 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {

boolean specChanged =
DiffType.IGNORE != diffType
|| reconciliationStatus.getState() == ReconciliationState.UPGRADING
|| reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK;
|| reconciliationStatus.getState() == ReconciliationState.UPGRADING;

if (!specChanged && reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
// Rely on the last stable spec if rolling back and no change in the spec
cr.setSpec(cr.getStatus().getReconciliationStatus().deserializeLastStableSpec());
specChanged = true;
} else if (specChanged
&& reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
// Spec has changed while rolling back we should apply new spec and move to upgrading
// state
// Don't take in account changes on job.state as it could be overriden to running if the
// current spec is not valid
lastReconciledSpec.getJob().setState(currentDeploySpec.getJob().getState());
var specDiffRollingBack =
new ReflectiveDiffBuilder<>(
ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec)
.build();
if (DiffType.IGNORE != specDiffRollingBack.getType()) {
reconciliationStatus.setState(ReconciliationState.UPGRADING);
}
}

var observeConfig = ctx.getObserveConfig();
if (specChanged) {
Expand Down Expand Up @@ -361,8 +381,7 @@ private boolean shouldRollBack(FlinkResourceContext<CR> ctx, Configuration confi
return false;
}

if (lastStableSpec.getJob() != null
&& lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
if (lastStableSpec.getJob() != null && lastStableSpec.getJob().getState() == SUSPENDED) {
// Should not roll back to suspended state
return false;
}
Expand Down Expand Up @@ -449,7 +468,7 @@ protected boolean shouldRecoverDeployment(Configuration conf, FlinkDeployment de

private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
var deployedJob = ReconciliationUtils.getDeployedSpec(deployment).getJob();
return (deployedJob == null || deployedJob.getState() == JobState.RUNNING)
return (deployedJob == null || deployedJob.getState() == RUNNING)
&& (deployment.getStatus().getJobManagerDeploymentStatus()
== JobManagerDeploymentStatus.MISSING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ public void testRollback(

testController.reconcile(deployment, context);
testController.reconcile(deployment, context);
assertEquals(
ReconciliationState.ROLLED_BACK,
deployment.getStatus().getReconciliationStatus().getState());
assertNotEquals(
deployment.getStatus().getReconciliationStatus().deserializeLastStableSpec(),
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
Expand Down

0 comments on commit c22f02d

Please sign in to comment.