Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/all_jobmanager_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>Boolean</td>
<td>This parameter defines whether the adaptive scheduler prioritizes using the minimum number of <code class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, this parameter is suitable if <code class="highlighter-rouge">execution.state-recovery.from-local</code> is not enabled. More details about this configuration are available at <a href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, rather than waiting for the next periodic checkpoint. This reduces rescaling latency, especially when checkpoint intervals are large. The active trigger respects execution.checkpointing.min-pause and will not fire if a checkpoint is already in progress or being triggered.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
<td>Boolean</td>
<td>This parameter defines whether the adaptive scheduler prioritizes using the minimum number of <code class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, this parameter is suitable if <code class="highlighter-rouge">execution.state-recovery.from-local</code> is not enabled. More details about this configuration are available at <a href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, rather than waiting for the next periodic checkpoint. This reduces rescaling latency, especially when checkpoint intervals are large. The active trigger respects execution.checkpointing.min-pause and will not fire if a checkpoint is already in progress or being triggered.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>Boolean</td>
<td>This parameter defines whether the adaptive scheduler prioritizes using the minimum number of <code class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, this parameter is suitable if <code class="highlighter-rouge">execution.state-recovery.from-local</code> is not enabled. More details about this configuration are available at <a href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, rather than waiting for the next periodic checkpoint. This reduces rescaling latency, especially when checkpoint intervals are large. The active trigger respects execution.checkpointing.min-pause and will not fire if a checkpoint is already in progress or being triggered.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,24 @@ public InlineElement getDescription() {
.key()))
.build());

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Boolean> SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED =
key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
Description.builder()
.text(
"When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, "
+ "rather than waiting for the next periodic checkpoint. "
+ "This reduces rescaling latency, especially when checkpoint intervals are large. "
+ "The active trigger respects execution.checkpointing.min-pause and "
+ "will not fire if a checkpoint is already in progress or being triggered.")
.build());

/**
* @deprecated Use {@link
* JobManagerOptions#SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1996,6 +1996,16 @@ public long getCheckpointTimeout() {
return checkpointTimeout;
}

/**
* Returns {@code true} if enough time has elapsed since the last checkpoint completion to
* satisfy the configured {@code minPauseBetweenCheckpoints}. This can be used by callers that
* trigger non-periodic checkpoints but still wish to respect the min-pause constraint.
*/
public boolean isMinPauseBetweenCheckpointsSatisfied() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnowojski Is this safe without lock?

return clock.relativeTimeMillis() - lastCheckpointCompletionRelativeTime
>= minPauseBetweenCheckpoints;
}

/**
* @deprecated use {@link #getNumQueuedRequests()}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,9 @@ public static Settings of(
SCHEDULER_RESCALE_TRIGGER_MAX_DELAY,
maximumDelayForRescaleTriggerDefault),
rescaleOnFailedCheckpointsCount,
configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE));
configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE),
configuration.get(
JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED));
}

private final SchedulerExecutionMode executionMode;
Expand All @@ -326,6 +328,7 @@ public static Settings of(
private final Duration maximumDelayForTriggeringRescale;
private final int rescaleOnFailedCheckpointCount;
private final int rescaleHistoryMax;
private final boolean activeCheckpointTriggerEnabled;

private Settings(
SchedulerExecutionMode executionMode,
Expand All @@ -336,7 +339,8 @@ private Settings(
Duration executingResourceStabilizationTimeout,
Duration maximumDelayForTriggeringRescale,
int rescaleOnFailedCheckpointCount,
int rescaleHistoryMax) {
int rescaleHistoryMax,
boolean activeCheckpointTriggerEnabled) {
this.executionMode = executionMode;
this.submissionResourceWaitTimeout = submissionResourceWaitTimeout;
this.submissionResourceStabilizationTimeout = submissionResourceStabilizationTimeout;
Expand All @@ -346,6 +350,7 @@ private Settings(
this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.rescaleHistoryMax = rescaleHistoryMax;
this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
}

public SchedulerExecutionMode getExecutionMode() {
Expand Down Expand Up @@ -384,6 +389,10 @@ public int getRescaleHistoryMax() {
return rescaleHistoryMax;
}

public boolean isActiveCheckpointTriggerEnabled() {
return activeCheckpointTriggerEnabled;
}

public JobRescaleConfigInfo toJobRescaleConfigInfo() {
return new JobRescaleConfigInfo(
rescaleHistoryMax,
Expand Down Expand Up @@ -1311,7 +1320,8 @@ public void goToExecuting(
userCodeClassLoader,
failureCollection,
this::createExecutingStateTransitionManager,
settings.getRescaleOnFailedCheckpointCount()));
settings.getRescaleOnFailedCheckpointCount(),
settings.isActiveCheckpointTriggerEnabled()));
}

private StateTransitionManager createExecutingStateTransitionManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ private void progressToStabilized(Temporal firstChangeEventTimestamp) {
progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, maxTriggerDelay));
}

void requestActiveCheckpointTrigger() {
transitionContext.requestActiveCheckpointTrigger();
}

private void triggerTransitionToSubsequentState() {
progressToPhase(new Transitioning(clock, this));
transitionContext.transitionToSubsequentState();
Expand Down Expand Up @@ -362,6 +366,7 @@ private Stabilizing(
resourceStabilizationTimeout);

scheduleTransitionEvaluation();
context().requestActiveCheckpointTrigger();
}

@Override
Expand All @@ -370,6 +375,7 @@ void onChange(boolean newResourceDriven) {
// event was already handled by a onTrigger callback with a no-op
onChangeEventTimestamp = now();
scheduleTransitionEvaluation();
context().requestActiveCheckpointTrigger();
}

@Override
Expand Down Expand Up @@ -427,6 +433,7 @@ private Stabilized(
},
firstChangeEventTimestamp,
maxTriggerDelay);
context().requestActiveCheckpointTrigger();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
Expand All @@ -40,6 +41,7 @@
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

import org.slf4j.Logger;

Expand All @@ -64,6 +66,7 @@ class Executing extends StateWithExecutionGraph

private final StateTransitionManager stateTransitionManager;
private final int rescaleOnFailedCheckpointCount;
private final boolean activeCheckpointTriggerEnabled;
// null indicates that there was no change event observed, yet
@Nullable private AtomicInteger failedCheckpointCountdown;

Expand All @@ -77,7 +80,8 @@ class Executing extends StateWithExecutionGraph
List<ExceptionHistoryEntry> failureCollection,
Function<StateTransitionManager.Context, StateTransitionManager>
stateTransitionManagerFactory,
int rescaleOnFailedCheckpointCount) {
int rescaleOnFailedCheckpointCount,
boolean activeCheckpointTriggerEnabled) {
super(
context,
executionGraph,
Expand All @@ -96,6 +100,7 @@ class Executing extends StateWithExecutionGraph
rescaleOnFailedCheckpointCount > 0,
"The rescaleOnFailedCheckpointCount should be larger than 0.");
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
this.failedCheckpointCountdown = null;

recordRescaleForJobIntoExecuting(logger, context);
Expand Down Expand Up @@ -182,6 +187,74 @@ public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration delay) {
return context.runIfState(this, callback, delay);
}

private static final Duration ACTIVE_CHECKPOINT_RETRY_DELAY = Duration.ofSeconds(10);

@Override
public void requestActiveCheckpointTrigger() {
if (!activeCheckpointTriggerEnabled) {
return;
}

final CheckpointCoordinator checkpointCoordinator =
getExecutionGraph().getCheckpointCoordinator();

if (checkpointCoordinator == null
|| !checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
getLogger()
.debug(
"Skipping active checkpoint trigger for rescale: checkpointing not configured.");
return;
}

if (!parallelismChanged()) {
getLogger()
.debug(
"Skipping active checkpoint trigger for rescale: parallelism unchanged.");
return;
}

if (checkpointCoordinator.getNumberOfPendingCheckpoints() > 0
|| checkpointCoordinator.isTriggering()) {
getLogger()
.debug(
"Skipping active checkpoint trigger for rescale: checkpoint already in progress.");
return;
}

if (!checkpointCoordinator.isMinPauseBetweenCheckpointsSatisfied()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you return a remaining time to the next checkpoint instead of boolean, then you can directly use it in following call of context.runIfState( this, this::requestActiveCheckpointTrigger, remainingTimeToSatisfyMinPause) and you can get rid of hardcoded ACTIVE_CHECKPOINT_RETRY_DELAY .

getLogger()
.debug(
"Skipping active checkpoint trigger for rescale: min pause between checkpoints not satisfied, scheduling retry.");
context.runIfState(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get e.g. 10 onChange events then we will schedule this method 10 times. We should have some kind of deduplication.

this, this::requestActiveCheckpointTrigger, ACTIVE_CHECKPOINT_RETRY_DELAY);
return;
}

getLogger().info("Actively triggering checkpoint to expedite rescaling.");
FutureUtils.assertNoException(
checkpointCoordinator
.triggerCheckpoint(false)
.handle(
(completedCheckpoint, throwable) -> {
if (throwable != null) {
getLogger()
.warn(
"Active checkpoint trigger for rescale failed, scheduling retry.",
throwable);
context.runIfState(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to introduce this endless cycle? I feel we should just log it and give it up. If the checkpoint is failing there probably is different issue with job and we shouldn't try it again and again without any retry cap.

this,
this::requestActiveCheckpointTrigger,
ACTIVE_CHECKPOINT_RETRY_DELAY);
} else {
getLogger()
.info(
"Active checkpoint for rescale completed successfully: {}.",
completedCheckpoint.getCheckpointID());
}
return null;
}));
}

@Override
public void transitionToSubsequentState() {
Optional<VertexParallelism> availableVertexParallelism =
Expand Down Expand Up @@ -399,6 +472,7 @@ static class Factory implements StateFactory<Executing> {
private final Function<StateTransitionManager.Context, StateTransitionManager>
stateTransitionManagerFactory;
private final int rescaleOnFailedCheckpointCount;
private final boolean activeCheckpointTriggerEnabled;

Factory(
ExecutionGraph executionGraph,
Expand All @@ -410,7 +484,8 @@ static class Factory implements StateFactory<Executing> {
List<ExceptionHistoryEntry> failureCollection,
Function<StateTransitionManager.Context, StateTransitionManager>
stateTransitionManagerFactory,
int rescaleOnFailedCheckpointCount) {
int rescaleOnFailedCheckpointCount,
boolean activeCheckpointTriggerEnabled) {
this.context = context;
this.log = log;
this.executionGraph = executionGraph;
Expand All @@ -420,6 +495,7 @@ static class Factory implements StateFactory<Executing> {
this.failureCollection = failureCollection;
this.stateTransitionManagerFactory = stateTransitionManagerFactory;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
}

public Class<Executing> getStateClass() {
Expand All @@ -436,7 +512,8 @@ public Executing getState() {
userCodeClassLoader,
failureCollection,
stateTransitionManagerFactory,
rescaleOnFailedCheckpointCount);
rescaleOnFailedCheckpointCount,
activeCheckpointTriggerEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,24 @@ interface Context extends RescaleContext {
* @return the {@link JobID} of the job
*/
JobID getJobId();

/**
* Requests the context to actively trigger a checkpoint to expedite rescaling. Called by
* the {@link DefaultStateTransitionManager} from within phase lifecycle methods:
*
* <ul>
* <li>On entering {@link DefaultStateTransitionManager.Stabilizing} (to overlap
* checkpoint with the stabilization wait)
* <li>On each {@link DefaultStateTransitionManager.Stabilizing#onChange} event (retry if
* a previous trigger was skipped)
* <li>On entering {@link DefaultStateTransitionManager.Stabilized} (fallback if no
* checkpoint completed during stabilization)
* </ul>
*
* <p>The implementation decides whether to actually trigger based on its own guard
* conditions (e.g., checkpointing enabled, no checkpoint in progress, config flag).
* Multiple calls are safe; guards prevent redundant triggers.
*/
default void requestActiveCheckpointTrigger() {}
}
}
Loading