Skip to content

Commit

Permalink
[FLINK-32012] Do not rely on lastRollbackSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
ashangit committed Jun 23, 2023
1 parent 70a17e8 commit 6c056df
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 79 deletions.
2 changes: 0 additions & 2 deletions docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| reconciliationTimestamp | long | Epoch timestamp of the last successful reconcile operation. |
| lastReconciledSpec | java.lang.String | Last reconciled deployment spec. Used to decide whether further reconciliation steps are necessary. |
| lastStableSpec | java.lang.String | Last stable deployment spec according to the specified stability condition. If a rollback strategy is defined this will be the target to roll back to. |
| lastRollbackSpec | java.lang.String | Last rollback deployment spec. Used to decide whether further reconciliation steps are necessary. If a rollback happens this will be the spec used to know if there was some update in the current spec |
| state | org.apache.flink.kubernetes.operator.api.status.ReconciliationState | Deployment state of the last reconciled spec. |

### FlinkDeploymentStatus
Expand Down Expand Up @@ -216,7 +215,6 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| reconciliationTimestamp | long | Epoch timestamp of the last successful reconcile operation. |
| lastReconciledSpec | java.lang.String | Last reconciled deployment spec. Used to decide whether further reconciliation steps are necessary. |
| lastStableSpec | java.lang.String | Last stable deployment spec according to the specified stability condition. If a rollback strategy is defined this will be the target to roll back to. |
| lastRollbackSpec | java.lang.String | Last rollback deployment spec. Used to decide whether further reconciliation steps are necessary. If a rollback happens this will be the spec used to know if there was some update in the current spec |
| state | org.apache.flink.kubernetes.operator.api.status.ReconciliationState | Deployment state of the last reconciled spec. |

### FlinkSessionJobStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ public abstract class ReconciliationStatus<SPEC extends AbstractFlinkSpec> {
*/
private String lastStableSpec;

/**
* Last rollback deployment spec. Used to decide whether further reconciliation steps are
* necessary. If a rollback happens this will be the spec used to know if there was some update
* in the current spec
*/
private String lastRollbackSpec;

/** Deployment state of the last reconciled spec. */
private ReconciliationState state = ReconciliationState.UPGRADING;

Expand All @@ -73,12 +66,6 @@ public SPEC deserializeLastStableSpec() {
return specWithMeta != null ? specWithMeta.getSpec() : null;
}

@JsonIgnore
public SPEC deserializeLastRollbackSpec() {
var specWithMeta = deserializeLastRollbackSpecWithMeta();
return specWithMeta != null ? specWithMeta.getSpec() : null;
}

@JsonIgnore
public SpecWithMeta<SPEC> deserializeLastReconciledSpecWithMeta() {
return SpecUtils.deserializeSpecWithMeta(lastReconciledSpec, getSpecClass());
Expand All @@ -89,23 +76,12 @@ public SpecWithMeta<SPEC> deserializeLastStableSpecWithMeta() {
return SpecUtils.deserializeSpecWithMeta(lastStableSpec, getSpecClass());
}

@JsonIgnore
public SpecWithMeta<SPEC> deserializeLastRollbackSpecWithMeta() {
return SpecUtils.deserializeSpecWithMeta(lastRollbackSpec, getSpecClass());
}

@JsonIgnore
public void serializeAndSetLastReconciledSpec(
SPEC spec, AbstractFlinkResource<SPEC, ?> resource) {
setLastReconciledSpec(SpecUtils.writeSpecWithMeta(spec, resource));
}

@JsonIgnore
public void serializeAndSetLastRollbackSpec(
SPEC spec, AbstractFlinkResource<SPEC, ?> resource) {
setLastRollbackSpec(SpecUtils.writeSpecWithMeta(spec, resource));
}

public void markReconciledSpecAsStable() {
lastStableSpec = lastReconciledSpec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
previousDeployment,
false);
}
// Rely on the last stable spec if under a roll back
// Rely on the last stable spec if rolling back
if (flinkApp.getStatus().getReconciliationStatus().getState()
== ReconciliationState.ROLLING_BACK) {
flinkApp.setSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class ReconciliationUtils {
private static final Logger LOG = LoggerFactory.getLogger(ReconciliationUtils.class);

private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Update status after successful deployment of a new resource spec. Existing reconciliation
* errors will be cleared, lastReconciled spec will be updated and for suspended jobs it will
Expand Down Expand Up @@ -131,9 +132,13 @@ private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconcil
}
reconciliationStatus.setState(state);

var clonedSpec = ReconciliationUtils.clone(spec);
if (status.getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK
|| status.getReconciliationStatus().getState() == ReconciliationState.ROLLED_BACK) {
clonedSpec = reconciliationStatus.deserializeLastReconciledSpec();
}
if (spec.getJob() != null) {
// For jobs we have to adjust the reconciled spec
var clonedSpec = ReconciliationUtils.clone(spec);
var job = clonedSpec.getJob();
job.setState(stateAfterReconcile);

Expand All @@ -156,7 +161,7 @@ private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconcil
reconciliationStatus.markReconciledSpecAsStable();
}
} else {
reconciliationStatus.serializeAndSetLastReconciledSpec(spec, target);
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
}
}

Expand Down Expand Up @@ -284,17 +289,6 @@ public static boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously(
&& !HighAvailabilityMode.isHighAvailabilityModeActivated(observeConfig);
}

public static <SPEC extends AbstractFlinkSpec> SPEC getLastSpec(
AbstractFlinkResource<SPEC, ?> deployment) {
var reconciliationStatus = deployment.getStatus().getReconciliationStatus();
var reconciliationState = reconciliationStatus.getState();
if (reconciliationState != ReconciliationState.ROLLED_BACK) {
return reconciliationStatus.deserializeLastReconciledSpec();
} else {
return reconciliationStatus.deserializeLastRollbackSpec();
}
}

public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec(
AbstractFlinkResource<SPEC, ?> deployment) {
var reconciliationStatus = deployment.getStatus().getReconciliationStatus();
Expand Down Expand Up @@ -354,14 +348,14 @@ public static <SPEC extends AbstractFlinkSpec> boolean applyValidationErrorAndRe
deployment, new ValidationException(validationError), conf);
}

var lastSpec = getLastSpec(deployment);
var lastReconciledSpec = status.getReconciliationStatus().deserializeLastReconciledSpec();

if (lastSpec == null) {
if (lastReconciledSpec == null) {
// Validation failed before anything was deployed, nothing to do
return false;
} else {
// We need to observe/reconcile using the last version of the deployment spec
deployment.setSpec(lastSpec);
deployment.setSpec(lastReconciledSpec);
if (status.getReconciliationStatus().getState() == ReconciliationState.UPGRADING
|| status.getReconciliationStatus().getState()
== ReconciliationState.ROLLING_BACK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
Expand Down Expand Up @@ -128,10 +127,11 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {
return;
}

SPEC lastSpec = ReconciliationUtils.getLastSpec(cr);
SPEC lastReconciledSpec =
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
SPEC currentDeploySpec = cr.getSpec();

var specDiff = new ReflectiveDiffBuilder<>(lastSpec, currentDeploySpec).build();
var specDiff = new ReflectiveDiffBuilder<>(lastReconciledSpec, currentDeploySpec).build();

boolean specChanged =
DiffType.IGNORE != specDiff.getType()
Expand Down Expand Up @@ -169,7 +169,7 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws Exception {

if (shouldRollBack(ctx, observeConfig)) {
// Rollbacks are executed in two steps, we initiate it first then return
if (initiateRollBack(ctx, status)) {
if (initiateRollBack(status)) {
return;
}
LOG.warn(MSG_ROLLBACK);
Expand Down Expand Up @@ -399,13 +399,10 @@ private boolean shouldRollBack(FlinkResourceContext<CR> ctx, Configuration confi
/**
* Initiate rollback process by changing the {@link ReconciliationState} in the status.
*
* @param ctx Reconciliation context.
* @param status Resource status.
* @return True if a new rollback was initiated.
*/
private boolean initiateRollBack(FlinkResourceContext<CR> ctx, STATUS status)
throws JsonProcessingException {
var target = ctx.getResource();
private boolean initiateRollBack(STATUS status) {
var reconciliationStatus = status.getReconciliationStatus();
if (reconciliationStatus.getState() != ReconciliationState.ROLLING_BACK) {
LOG.warn("Preparing to roll back to last stable spec.");
Expand All @@ -414,7 +411,6 @@ private boolean initiateRollBack(FlinkResourceContext<CR> ctx, STATUS status)
"Deployment is not ready within the configured timeout, rolling back.");
}
reconciliationStatus.setState(ReconciliationState.ROLLING_BACK);
reconciliationStatus.serializeAndSetLastRollbackSpec(target.getSpec(), target);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public void testStatefulRollback(UpgradeMode upgradeMode) throws Exception {
dep.getSpec().setRestartNonce(10L);
testController.reconcile(dep, context);
},
true,
UpgradeMode.LAST_STATE);
true);
}

@Test
Expand Down Expand Up @@ -162,8 +161,7 @@ public void testRollbackFailureWithLastState() throws Exception {
testController.reconcile(dep, context);
flinkService.setPortReady(true);
},
false,
UpgradeMode.LAST_STATE);
false);
}

@Test
Expand Down Expand Up @@ -216,8 +214,7 @@ public void testRollbackStateless() throws Exception {
dep.getSpec().setRestartNonce(10L);
testController.reconcile(dep, context);
},
true,
UpgradeMode.STATELESS);
true);
}

@Test
Expand All @@ -239,16 +236,14 @@ public void testRollbackSession() throws Exception {
dep.getStatus().getJobManagerDeploymentStatus());
dep.getSpec().setRestartNonce(10L);
},
false,
null);
false);
}

public void testRollback(
FlinkDeployment deployment,
ThrowingRunnable<Exception> triggerRollback,
ThrowingRunnable<Exception> validateAndRecover,
boolean injectValidationError,
UpgradeMode expectedUpgradeMode)
boolean injectValidationError)
throws Exception {

var flinkConfiguration = deployment.getSpec().getFlinkConfiguration();
Expand Down Expand Up @@ -316,25 +311,18 @@ public void testRollback(

testController.reconcile(deployment, context);
testController.reconcile(deployment, context);
assertEquals(
ReconciliationState.ROLLED_BACK,
deployment.getStatus().getReconciliationStatus().getState());
var lastStable =
deployment.getStatus().getReconciliationStatus().deserializeLastStableSpec();
var lastReconcile =
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
if (lastStable.getJob() != null) {
lastStable.getJob().setUpgradeMode(expectedUpgradeMode);
}
assertEquals(lastStable, lastReconcile);
assertNotEquals(
deployment.getStatus().getReconciliationStatus().deserializeLastStableSpec(),
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());

deployment.getSpec().getFlinkConfiguration().put("random2", "config");
deployment.setSpec(
deployment.getStatus().getReconciliationStatus().deserializeLastStableSpec());
testController.reconcile(deployment, context);
testController.reconcile(deployment, context);
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
assertNotEquals(
assertEquals(
deployment.getStatus().getReconciliationStatus().deserializeLastStableSpec(),
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9303,8 +9303,6 @@ spec:
type: string
lastStableSpec:
type: string
lastRollbackSpec:
type: string
state:
enum:
- DEPLOYED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ spec:
type: string
lastStableSpec:
type: string
lastRollbackSpec:
type: string
state:
enum:
- DEPLOYED
Expand Down

0 comments on commit 6c056df

Please sign in to comment.