Skip to content

Commit

Permalink
[FLINK-32012] Create method prepareCrForRollback
Browse files Browse the repository at this point in the history
  • Loading branch information
ashangit committed Jun 26, 2023
1 parent 993dd95 commit af92e3a
Showing 1 changed file with 30 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ public AbstractFlinkResourceReconciler(
this.resourceScaler = autoscalerFactory.create(kubernetesClient, eventRecorder);
}

private boolean prepareCrForRollback(
FlinkResourceContext<CR> ctx,
boolean specChanged,
SPEC currentDeploySpec,
SPEC lastReconciledSpec) {
var cr = ctx.getResource();
var reconciliationStatus = cr.getStatus().getReconciliationStatus();
if (!specChanged) {
// Rely on the last stable spec if rolling back and no change in the spec
cr.setSpec(cr.getStatus().getReconciliationStatus().deserializeLastStableSpec());
} else {
// Spec has changed while rolling back we should apply new spec and move to upgrading
// state
// Don't take in account changes on job.state as it could be overriden to running if the
// current spec is not valid
lastReconciledSpec.getJob().setState(currentDeploySpec.getJob().getState());
var specDiffRollingBack =
new ReflectiveDiffBuilder<>(
ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec)
.build();
if (DiffType.IGNORE != specDiffRollingBack.getType()) {
reconciliationStatus.setState(ReconciliationState.UPGRADING);
}
}
return true;
}

@Override
public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
var cr = ctx.getResource();
Expand Down Expand Up @@ -138,24 +165,9 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
DiffType.IGNORE != diffType
|| reconciliationStatus.getState() == ReconciliationState.UPGRADING;

if (!specChanged && reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
// Rely on the last stable spec if rolling back and no change in the spec
cr.setSpec(cr.getStatus().getReconciliationStatus().deserializeLastStableSpec());
specChanged = true;
} else if (specChanged
&& reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
// Spec has changed while rolling back we should apply new spec and move to upgrading
// state
// Don't take in account changes on job.state as it could be overriden to running if the
// current spec is not valid
lastReconciledSpec.getJob().setState(currentDeploySpec.getJob().getState());
var specDiffRollingBack =
new ReflectiveDiffBuilder<>(
ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec)
.build();
if (DiffType.IGNORE != specDiffRollingBack.getType()) {
reconciliationStatus.setState(ReconciliationState.UPGRADING);
}
if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
specChanged =
prepareCrForRollback(ctx, specChanged, currentDeploySpec, lastReconciledSpec);
}

var observeConfig = ctx.getObserveConfig();
Expand Down

0 comments on commit af92e3a

Please sign in to comment.