Skip to content

Commit

Permalink
[FLINK-32012] Take in account case where new spec submitted while rol…
Browse files Browse the repository at this point in the history
…ling back
  • Loading branch information
ashangit committed Jun 26, 2023
1 parent 070472a commit cdf9850
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
Expand Down Expand Up @@ -141,12 +140,6 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
previousDeployment,
false);
}
// Rely on the last stable spec if rolling back
if (flinkApp.getStatus().getReconciliationStatus().getState()
== ReconciliationState.ROLLING_BACK) {
flinkApp.setSpec(
flinkApp.getStatus().getReconciliationStatus().deserializeLastStableSpec());
}
statusRecorder.patchAndCacheStatus(flinkApp);
reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);
} catch (RecoveryFailureException rfe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,20 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {

boolean specChanged =
DiffType.IGNORE != specDiff.getType()
|| reconciliationStatus.getState() == ReconciliationState.UPGRADING
|| reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK;
|| reconciliationStatus.getState() == ReconciliationState.UPGRADING;

if (!specChanged && reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
// Rely on the last stable spec if rolling back and no change in the spec
cr.setSpec(cr.getStatus().getReconciliationStatus().deserializeLastStableSpec());
specChanged = true;
} else if (specChanged && reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
// Spec has changed while rolling back we should apply new spec and move to upgrading state
// Don't take in account changes on job.state as it could be overriden to running if the current
// spec is not valid
if (specDiff.getNumDiffs() == 1 && !"job.state".equals(specDiff.getDiffList().get(0).getFieldName())){
reconciliationStatus.setState(ReconciliationState.UPGRADING);
}
}

var observeConfig = ctx.getObserveConfig();
if (specChanged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ public void testRollback(

testController.reconcile(deployment, context);
testController.reconcile(deployment, context);
assertEquals(
ReconciliationState.ROLLED_BACK,
deployment.getStatus().getReconciliationStatus().getState());
assertNotEquals(
deployment.getStatus().getReconciliationStatus().deserializeLastStableSpec(),
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
Expand Down

0 comments on commit cdf9850

Please sign in to comment.