Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,24 @@ public InlineElement getDescription() {
.key()))
.build());

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Boolean> SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED =
key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled")
.booleanType()
.defaultValue(false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a downside of using this option? If we expect this to be generally positive change, and you disable it by default only as a pre-caution/for backward compatibility, I would be actually fine setting it by default to true.

Copy link
Copy Markdown
Contributor Author

@Samrat002 Samrat002 Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, I chose a defensive approach. There is no compelling reason to keep it false.
Updated default value to true

.withDescription(
Description.builder()
.text(
"When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, "
+ "rather than waiting for the next periodic checkpoint. "
+ "This reduces rescaling latency, especially when checkpoint intervals are large. "
+ "The active trigger respects %s and will not trigger if a checkpoint is already in progress.",
Copy link
Copy Markdown
Contributor

@ztison ztison Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really respect the min-pause? What I see it only respects that it doesn't trigger a new checkpoint when another is in a progress.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh check for minpause was missing . PTAL

text("execution.checkpointing.min-pause"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might probably be:
code(CheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.key()))

.build());

/**
* @deprecated Use {@link
* JobManagerOptions#SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,9 @@ public static Settings of(
SCHEDULER_RESCALE_TRIGGER_MAX_DELAY,
maximumDelayForRescaleTriggerDefault),
rescaleOnFailedCheckpointsCount,
configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE));
configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE),
configuration.get(
JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED));
}

private final SchedulerExecutionMode executionMode;
Expand All @@ -326,6 +328,7 @@ public static Settings of(
private final Duration maximumDelayForTriggeringRescale;
private final int rescaleOnFailedCheckpointCount;
private final int rescaleHistoryMax;
private final boolean activeCheckpointTriggerEnabled;

private Settings(
SchedulerExecutionMode executionMode,
Expand All @@ -336,7 +339,8 @@ private Settings(
Duration executingResourceStabilizationTimeout,
Duration maximumDelayForTriggeringRescale,
int rescaleOnFailedCheckpointCount,
int rescaleHistoryMax) {
int rescaleHistoryMax,
boolean activeCheckpointTriggerEnabled) {
this.executionMode = executionMode;
this.submissionResourceWaitTimeout = submissionResourceWaitTimeout;
this.submissionResourceStabilizationTimeout = submissionResourceStabilizationTimeout;
Expand All @@ -346,6 +350,7 @@ private Settings(
this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.rescaleHistoryMax = rescaleHistoryMax;
this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
}

public SchedulerExecutionMode getExecutionMode() {
Expand Down Expand Up @@ -384,6 +389,10 @@ public int getRescaleHistoryMax() {
return rescaleHistoryMax;
}

public boolean isActiveCheckpointTriggerEnabled() {
return activeCheckpointTriggerEnabled;
}

public JobRescaleConfigInfo toJobRescaleConfigInfo() {
return new JobRescaleConfigInfo(
rescaleHistoryMax,
Expand Down Expand Up @@ -1311,7 +1320,8 @@ public void goToExecuting(
userCodeClassLoader,
failureCollection,
this::createExecutingStateTransitionManager,
settings.getRescaleOnFailedCheckpointCount()));
settings.getRescaleOnFailedCheckpointCount(),
settings.isActiveCheckpointTriggerEnabled()));
}

private StateTransitionManager createExecutingStateTransitionManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,16 @@ private void progressToStabilizing(Temporal firstChangeEventTimestamp) {
resourceStabilizationTimeout,
firstChangeEventTimestamp,
maxTriggerDelay));
transitionContext.requestActiveCheckpointTrigger();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this call is needed here? ISn't it enough to call it in org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilizing#onChange ?

}

private void progressToStabilized(Temporal firstChangeEventTimestamp) {
progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, maxTriggerDelay));
transitionContext.requestActiveCheckpointTrigger();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it looks the method is called on many places. Wondering if we could control when it is called only in Phases. So moving this to Stabilized phase?

}

void requestActiveCheckpointTrigger() {
transitionContext.requestActiveCheckpointTrigger();
}

private void triggerTransitionToSubsequentState() {
Expand Down Expand Up @@ -370,6 +376,7 @@ void onChange(boolean newResourceDriven) {
// event was already handled by a onTrigger callback with a no-op
onChangeEventTimestamp = now();
scheduleTransitionEvaluation();
context().requestActiveCheckpointTrigger();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
Expand Down Expand Up @@ -64,6 +65,7 @@ class Executing extends StateWithExecutionGraph

private final StateTransitionManager stateTransitionManager;
private final int rescaleOnFailedCheckpointCount;
private final boolean activeCheckpointTriggerEnabled;
// null indicates that there was no change event observed, yet
@Nullable private AtomicInteger failedCheckpointCountdown;

Expand All @@ -77,7 +79,8 @@ class Executing extends StateWithExecutionGraph
List<ExceptionHistoryEntry> failureCollection,
Function<StateTransitionManager.Context, StateTransitionManager>
stateTransitionManagerFactory,
int rescaleOnFailedCheckpointCount) {
int rescaleOnFailedCheckpointCount,
boolean activeCheckpointTriggerEnabled) {
super(
context,
executionGraph,
Expand All @@ -96,6 +99,7 @@ class Executing extends StateWithExecutionGraph
rescaleOnFailedCheckpointCount > 0,
"The rescaleOnFailedCheckpointCount should be larger than 0.");
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
this.failedCheckpointCountdown = null;

recordRescaleForJobIntoExecuting(logger, context);
Expand Down Expand Up @@ -182,6 +186,11 @@ public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration delay) {
return context.runIfState(this, callback, delay);
}

@Override
public void requestActiveCheckpointTrigger() {
triggerCheckpointForRescale();
}

@Override
public void transitionToSubsequentState() {
Optional<VertexParallelism> availableVertexParallelism =
Expand Down Expand Up @@ -300,6 +309,69 @@ private void initializeFailedCheckpointCountdownIfUnset() {
}
}

/**
* Actively triggers a checkpoint to expedite rescaling. Without this, the scheduler would
* passively wait for the next periodic checkpoint, which could delay rescaling significantly
* when checkpoint intervals are large.
*
* <p>Guard conditions:
*
* <ul>
* <li>Checkpointing must be configured
* <li>Parallelism must have actually changed
* <li>No checkpoint must be currently in progress or being triggered
* </ul>
*/
private void triggerCheckpointForRescale() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to cover all possible paths by tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests PTAL

Comment thread
Samrat002 marked this conversation as resolved.
Outdated
if (!activeCheckpointTriggerEnabled) {
return;
}

final CheckpointCoordinator checkpointCoordinator =
getExecutionGraph().getCheckpointCoordinator();

if (checkpointCoordinator == null
|| !checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
getLogger()
.debug(
"Skipping active checkpoint trigger for rescale: checkpointing not configured.");
return;
}

if (!parallelismChanged()) {
getLogger()
.debug(
"Skipping active checkpoint trigger for rescale: parallelism unchanged.");
return;
}

if (checkpointCoordinator.getNumberOfPendingCheckpoints() > 0
|| checkpointCoordinator.isTriggering()) {
getLogger()
.debug(
"Skipping active checkpoint trigger for rescale: checkpoint already in progress.");
return;
}

getLogger().info("Actively triggering checkpoint to expedite rescaling.");
checkpointCoordinator
.triggerCheckpoint(false)
.whenComplete(
(completedCheckpoint, throwable) -> {
if (throwable != null) {
getLogger()
.warn(
"Active checkpoint trigger for rescale failed.",
throwable);
} else {
getLogger()
.info(
"Active checkpoint for rescale completed successfully: {}.",
completedCheckpoint.getCheckpointID());
}
});
Comment thread
Samrat002 marked this conversation as resolved.
Outdated
}

CompletableFuture<String> stopWithSavepoint(
@Nullable final String targetDirectory,
boolean terminate,
Expand Down Expand Up @@ -399,6 +471,7 @@ static class Factory implements StateFactory<Executing> {
private final Function<StateTransitionManager.Context, StateTransitionManager>
stateTransitionManagerFactory;
private final int rescaleOnFailedCheckpointCount;
private final boolean activeCheckpointTriggerEnabled;

Factory(
ExecutionGraph executionGraph,
Expand All @@ -410,7 +483,8 @@ static class Factory implements StateFactory<Executing> {
List<ExceptionHistoryEntry> failureCollection,
Function<StateTransitionManager.Context, StateTransitionManager>
stateTransitionManagerFactory,
int rescaleOnFailedCheckpointCount) {
int rescaleOnFailedCheckpointCount,
boolean activeCheckpointTriggerEnabled) {
this.context = context;
this.log = log;
this.executionGraph = executionGraph;
Expand All @@ -420,6 +494,7 @@ static class Factory implements StateFactory<Executing> {
this.failureCollection = failureCollection;
this.stateTransitionManagerFactory = stateTransitionManagerFactory;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
}

public Class<Executing> getStateClass() {
Expand All @@ -436,7 +511,8 @@ public Executing getState() {
userCodeClassLoader,
failureCollection,
stateTransitionManagerFactory,
rescaleOnFailedCheckpointCount);
rescaleOnFailedCheckpointCount,
activeCheckpointTriggerEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,14 @@ interface Context extends RescaleContext {
* @return the {@link JobID} of the job
*/
JobID getJobId();

/**
* Requests the context to actively trigger a checkpoint to expedite rescaling. Called when
* the {@link DefaultStateTransitionManager} enters a phase that is ready to accept {@link
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it true? I see that the method is called on more places: entering Stabilizing, entering Stabilized, and on each onChange event while in Stabilizing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have fixed the problem now. PTAL at the revised version

* #onTrigger()} events (i.e., {@link DefaultStateTransitionManager.Stabilizing}). The
* implementation decides whether to actually trigger based on its own guard conditions
* (e.g., checkpointing enabled, no checkpoint in progress, config flag).
*/
default void requestActiveCheckpointTrigger() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
(context) -> TestingStateTransitionManager.withNoOp(),
1);
1,
false);
assertThat(mockExecutionVertex.isDeployCalled()).isFalse();
}
}
Expand All @@ -186,7 +187,8 @@ void testIllegalStateExceptionOnNotRunningExecutionGraph() {
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
context -> TestingStateTransitionManager.withNoOp(),
1);
1,
false);
}
})
.isInstanceOf(IllegalStateException.class);
Expand Down Expand Up @@ -691,6 +693,7 @@ private final class ExecutingStateBuilder {
private Function<StateTransitionManager.Context, StateTransitionManager>
stateTransitionManagerFactory = context -> TestingStateTransitionManager.withNoOp();
private int rescaleOnFailedCheckpointCount = 1;
private boolean activeCheckpointTriggerEnabled = false;

private ExecutingStateBuilder() throws JobException, JobExecutionException {
operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
Expand Down Expand Up @@ -733,7 +736,8 @@ private Executing build(MockExecutingContext ctx) {
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
stateTransitionManagerFactory::apply,
rescaleOnFailedCheckpointCount);
rescaleOnFailedCheckpointCount,
activeCheckpointTriggerEnabled);
} finally {
Preconditions.checkState(
!ctx.hadStateTransition,
Expand Down Expand Up @@ -1029,6 +1033,12 @@ public boolean updateState(TaskExecutionStateTransition state) {
public Iterable<ExecutionJobVertex> getVerticesTopologically() {
return getVerticesTopologicallySupplier.get();
}

@Nullable
@Override
public CheckpointCoordinator getCheckpointCoordinator() {
return null;
}
}

private static class FinishingMockExecutionGraph extends StateTrackingMockExecutionGraph {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,59 @@ void testRescaleOnCheckpoint(
restClusterClient.cancel(jobGraph.getJobID()).join();
}
}

@Test
void testRescaleWithActiveCheckpointTrigger(
Comment thread
Samrat002 marked this conversation as resolved.
@InjectMiniCluster MiniCluster miniCluster,
@InjectClusterClient RestClusterClient<?> restClusterClient)
throws Exception {
final Configuration config = new Configuration();

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(BEFORE_RESCALE_PARALLELISM);
env.enableCheckpointing(Duration.ofHours(1).toMillis());
env.fromSequence(0, Integer.MAX_VALUE).sinkTo(new DiscardingSink<>());

final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final Iterator<JobVertex> jobVertexIterator = jobGraph.getVertices().iterator();
assertThat(jobVertexIterator.hasNext()).isTrue();
final JobVertexID jobVertexId = jobVertexIterator.next().getID();

final JobResourceRequirements jobResourceRequirements =
JobResourceRequirements.newBuilder()
.setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM)
.build();

restClusterClient.submitJob(jobGraph).join();

final JobID jobId = jobGraph.getJobID();
try {
LOG.info(
"Waiting for job {} to reach parallelism of {} for vertex {}.",
jobId,
BEFORE_RESCALE_PARALLELISM,
jobVertexId);
waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);

LOG.info(
"Updating job {} resource requirements: parallelism {} -> {}.",
jobId,
BEFORE_RESCALE_PARALLELISM,
AFTER_RESCALE_PARALLELISM);
restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join();
LOG.info(
"Waiting for job {} to rescale to parallelism {} via active checkpoint trigger.",
jobId,
AFTER_RESCALE_PARALLELISM);
waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM);
final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM;
LOG.info(
"Waiting for {} slot(s) to become available after scale down.",
expectedFreeSlotCount);
waitForAvailableSlots(restClusterClient, expectedFreeSlotCount);
Comment on lines +224 to +229
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your test, it would still pass after 1h, after the regular periodic checkpoint is triggered after 1h, even with your new option disabled, right?

I think you should make sure that the timeout in waitForRunningTasks waitForAvailableSlots (or CI timeout 4h) is longer than env.enableCheckpointing(Duration.ofHours(1).toMillis());. So either, decrease the timeout in waiting to < 30 minutes, or increase checkpointing interval to 24h (CI will be killed after 4h AFAIR).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated checkpointing to 24 hours. also added assertions for min-pause

} finally {
restClusterClient.cancel(jobGraph.getJobID()).join();
}
}
}