Skip to content

Commit

Permalink
Refactor GatewayService (elastic#99994)
Browse files Browse the repository at this point in the history
This PR refactors GatewayService with the goal to make it easier to add
new features.

Resolves: elastic#89310
  • Loading branch information
ywangd authored Oct 10, 2023
1 parent 4369e79 commit e351c68
Show file tree
Hide file tree
Showing 4 changed files with 598 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public interface ClusterStateTaskExecutor<T extends ClusterStateTaskListener> {
* already have become master and updated the state in a way that would be inconsistent with the response that {@code N} sends back to
* clients.
*
* @return The resulting cluster state after executing all the tasks. If {code batchExecutionContext.initialState()} is returned then no
* update is published.
* @return The resulting cluster state after executing all the tasks. If {@code batchExecutionContext.initialState()} is returned then
* no update is published.
*/
ClusterState execute(BatchExecutionContext<T> batchExecutionContext) throws Exception;

Expand Down
197 changes: 131 additions & 66 deletions server/src/main/java/org/elasticsearch/gateway/GatewayService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -80,9 +82,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
private final TimeValue recoverAfterTime;
private final int recoverAfterDataNodes;
private final int expectedDataNodes;

private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
volatile PendingStateRecovery currentPendingStateRecovery;

@Inject
public GatewayService(
Expand Down Expand Up @@ -131,8 +131,9 @@ public void clusterChanged(final ClusterChangedEvent event) {
}

final ClusterState state = event.state();
final DiscoveryNodes nodes = state.nodes();

if (state.nodes().isLocalNodeElectedMaster() == false) {
if (nodes.isLocalNodeElectedMaster() == false) {
// not our job to recover
return;
}
Expand All @@ -141,83 +142,153 @@ public void clusterChanged(final ClusterChangedEvent event) {
return;
}

final DiscoveryNodes nodes = state.nodes();
if (state.nodes().getMasterNodeId() == null) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
nodes.getDataNodes().size(),
recoverAfterDataNodes
);
} else {
boolean enforceRecoverAfterTime;
String reason;
if (expectedDataNodes == -1) {
// no expected is set, honor recover_after_data_nodes
enforceRecoverAfterTime = true;
reason = "recover_after_time was set to [" + recoverAfterTime + "]";
} else if (expectedDataNodes <= nodes.getDataNodes().size()) {
// expected is set and satisfied so recover immediately
enforceRecoverAfterTime = false;
reason = "";
// At this point, we know the state is not recovered and this node is qualified for state recovery
// But we still need to check whether a previous one is running already
final long currentTerm = state.term();
final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery;

// Always start a new state recovery if the master term changes
// If there is a previous one still waiting, both will probably run but at most one of them will
// actually make changes to cluster state because either:
// 1. The previous recovers the cluster state and the current one will be skipped
// 2. The previous one sees a new cluster term and skips its own execution
if (existingPendingStateRecovery == null || existingPendingStateRecovery.expectedTerm < currentTerm) {
currentPendingStateRecovery = new PendingStateRecovery(currentTerm);
}
currentPendingStateRecovery.onDataNodeSize(nodes.getDataNodes().size());
}

/**
* This class manages the cluster state recovery behaviours. It has two major scenarios depending
* on whether {@code recoverAfterDataNodes} is configured.
*
* <p> <b>When</b> {@code recoverAfterDataNodes} is configured:
* <ol>
* <li>Nothing can happen until it is reached
* <li>When {@code recoverAfterDataNodes} is reached, the cluster either:
* <ul>
* <li>Recover immediately when {@code expectedDataNodes} is reached or
* both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured
* <li>Or schedule a recovery with a delay of {@code recoverAfterTime}
* </ul>
* <li>The scheduled recovery can be cancelled if {@code recoverAfterDataNodes} drops below required number
* before the recovery can happen. When this happens, the process goes back to the beginning (step 1).
* <li>The recovery is scheduled only once each time {@code recoverAfterDataNodes} crosses the required number
* </ol>
*
* <p> <b>When</b> {@code recoverAfterDataNodes} is <b>Not</b> configured, the cluster either:
* <ul>
* <li>Recover immediately when {@code expectedDataNodes} is reached or
* both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured
* <li>Or schedule a recovery with a delay of {@code recoverAfterTime}
* </ul>
*/
class PendingStateRecovery {
private final long expectedTerm;
@Nullable
private Scheduler.ScheduledCancellable scheduledRecovery;
private final AtomicBoolean taskSubmitted = new AtomicBoolean();

PendingStateRecovery(long expectedTerm) {
this.expectedTerm = expectedTerm;
}

void onDataNodeSize(int currentDataNodeSize) {
if (recoverAfterDataNodes != -1 && currentDataNodeSize < recoverAfterDataNodes) {
logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
currentDataNodeSize,
recoverAfterDataNodes
);
cancelScheduledRecovery();
} else {
// expected is set but not satisfied so wait until it is satisfied or times out
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
maybePerformOrScheduleRecovery(currentDataNodeSize);
}
performStateRecovery(enforceRecoverAfterTime, reason);
}
}

private void performStateRecovery(final boolean enforceRecoverAfterTime, final String reason) {
if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery failed", e);
resetRecoveredFlags();
}

@Override
protected void doRun() {
if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
runRecovery();
void maybePerformOrScheduleRecovery(int currentDataNodeSize) {
if (expectedDataNodes != -1 && expectedDataNodes <= currentDataNodeSize) {
logger.debug(
"performing state recovery of term [{}], expected data nodes [{}] is reached",
expectedTerm,
expectedDataNodes
);
cancelScheduledRecovery();
runRecoveryImmediately();
} else if (recoverAfterTime == null) {
logger.debug("performing state recovery of term [{}], no delay time is configured", expectedTerm);
cancelScheduledRecovery();
runRecoveryImmediately();
} else {
if (scheduledRecovery == null) {
logger.info(
"delaying initial state recovery for [{}] of term [{}]. expecting [{}] data nodes, but only have [{}]",
recoverAfterTime,
expectedTerm,
expectedDataNodes,
currentDataNodeSize
);
scheduledRecovery = threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery of term [" + expectedTerm + "] failed", e);
}
}
}, recoverAfterTime, threadPool.generic());
}
} else {
if (recoveryInProgress.compareAndSet(false, true)) {
try {
logger.debug("performing state recovery...");
runRecovery();
} catch (Exception e) {
logger.warn("state recovery failed", e);
resetRecoveredFlags();

@Override
protected void doRun() {
final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery;
if (PendingStateRecovery.this == existingPendingStateRecovery) {
runRecoveryImmediately();
} else {
logger.debug(
"skip scheduled state recovery since a new one of term [{}] has started",
existingPendingStateRecovery.expectedTerm
);
}
}
}, recoverAfterTime, threadPool.generic());
} else {
logger.debug("state recovery is in already scheduled for term [{}]", expectedTerm);
}
}
}
}

private void resetRecoveredFlags() {
recoveryInProgress.set(false);
scheduledRecovery.set(false);
void runRecoveryImmediately() {
if (taskSubmitted.compareAndSet(false, true)) {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask(expectedTerm));
} else {
logger.debug("state recovery task is already submitted");
}
}

void cancelScheduledRecovery() {
if (scheduledRecovery != null) {
scheduledRecovery.cancel();
scheduledRecovery = null;
}
}
}

private static final String TASK_SOURCE = "local-gateway-elected-state";

class RecoverStateUpdateTask extends ClusterStateUpdateTask {

private final long expectedTerm;

RecoverStateUpdateTask(long expectedTerm) {
this.expectedTerm = expectedTerm;
}

@Override
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
logger.debug("cluster is already recovered");
return currentState;
}
if (expectedTerm != currentState.term()) {
logger.debug("skip state recovery since current term [{}] != expected term [{}]", currentState.term(), expectedTerm);
return currentState;
}
return ClusterStateUpdaters.removeStateNotRecoveredBlock(
ClusterStateUpdaters.updateRoutingTable(currentState, shardRoutingRoleStrategy)
);
Expand All @@ -228,7 +299,6 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
logger.info("recovered [{}] indices into cluster_state", newState.metadata().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.noop());
}

Expand All @@ -239,7 +309,6 @@ public void onFailure(final Exception e) {
() -> "unexpected failure during [" + TASK_SOURCE + "]",
e
);
resetRecoveredFlags();
}
}

Expand All @@ -248,10 +317,6 @@ TimeValue recoverAfterTime() {
return recoverAfterTime;
}

private void runRecovery() {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask());
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down
Loading

0 comments on commit e351c68

Please sign in to comment.